最近在看Java集合源码的时候研究了一下Spliterator, 发现了Spliterator的一些问题, 在这一篇中总结一下:
Spliterator源码解读
ArrayList中Spliterator的实现
LinkedList中Spliterator的实现
HashMap(Set)中Spliterator的实现
一. Spliterator源码解读
Spliterator接口是1.8新加的接口,字面意思可分割的迭代器,不同以往的iterator需要顺序迭代,Spliterator可以分割为若干个小的迭代器进行并行操作,既可以实现多线程操作提高效率,又可以避免普通迭代器的fail-fast机制所带来的异常
Spliterator可以配合1.8新加的Stream进行并行流的实现,大大提高处理效率
属性
通过characteristics()方法返回的值,用来标识实现类所具有的的特征
public static final int ORDERED = 0x00000010;//表示元素是有序的(每一次遍历结果相同)
public static final int DISTINCT = 0x00000001;//表示元素不重复
public static final int SORTED = 0x00000004;//表示元素是按一定规律进行排列(有指定比较器)
public static final int SIZED = 0x00000040;//是否确定大小
public static final int NONNULL = 0x00000100;//表示迭代器中没有null元素
public static final int IMMUTABLE = 0x00000400;//表示元素不可变
public static final int CONCURRENT = 0x00001000;//表示迭代器可以多线程操作
public static final int SUBSIZED = 0x00004000;//表示子Spliterators都具有SIZED特性
方法
boolean tryAdvance(Consumer<? super T> action);
如果有剩余的元素存在,执行参数给定的操作,并返回true,否则就返回false
如果Spliterator对象具有ORDERED属性,那么tryAdvance也会按照相应的顺序去执行
default void forEachRemaining(Consumer<? super T> action)
对Spliterator的每一个对象执行tryAdvance操作
default void forEachRemaining(Consumer<? super T> action) { do { } while (tryAdvance(action)); }
Spliterator<T> trySplit()
如果这个Spliterator是可以被分割的,那么这个方法会返回一个Spliterator,与原来的Spliterator平分其中的元素,如果原Spliterator的元素个数单数,两个Spliterator的元素个数相差1,基本是相同的
如果Spliterator不能再分割,那么会返回null
long estimateSize()
返回一个预估的值,等于执行forEachRemaining方法时调用tryAdvance的次数。
如果这个值过大,或者需要太复杂的计算过程,那么直接回返回long型的最大值
default long getExactSizeIfKnown()
返回Spliterator对象确切的大小,如果存在SIZED属性,则返回estimateSize()方法的返回值,否则返回-1default long getExactSizeIfKnown() { return (characteristics() & SIZED) == 0 ? -1L : estimateSize(); }
int characteristics();
返回Spliterator对象的特征值,这个上面有介绍。一般实现类中的属性就是几个属性进行或操作之后的结果
default boolean hasCharacteristics(int characteristics)
根据characteristics()与参数相与的结果看Spliterator对象是否包含参数指定的属性default boolean hasCharacteristics(int characteristics) { return (characteristics() & characteristics) == characteristics; }
default Comparator<? super T> getComparator()
如果Spliterator的具体实现具有SORTED属性,那么此方法会返回一个相应的比较器,否则会返回null
default Comparator<? super T> getComparator() { throw new IllegalStateException(); }
衍生接口
OfPrimitive
衍生接口的内容与Spliterator几乎无差别,只是将trySplit,tryAdvance,forEachRemaining三个方法中的参数类型做了一点小小的变化,更加适合基本参数类型
```java
public interface OfPrimitive<T, T_CONS, T_SPLITR extends
Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> extends Spliterator<T> {
@Override
T_SPLITR trySplit();
@SuppressWarnings("overloads")
boolean tryAdvance(T_CONS action);
@SuppressWarnings("overloads")
default void forEachRemaining(T_CONS action) {
do { } while (tryAdvance(action));
}
}
```
**下面有相应的OfInt、OfLong、OfDouble三个借口专门提供给int、long、double类型的数据使用**
二. ArrayList中Spliterator的实现
ArrayList中的实现类是ArrayListSpliterator, 其内部包括了三个变量:
- int index: 当前位置(包含),advance/spilt操作时会被修改(current index, modified on advance/split)
- int fence: 结束位置(不包含),-1表示到最后一个元素(-1 until used; then one past last index)
- int expectedModCount: 用于存放list的modCount(initialized when fence set)
其用法也是最简单的: 创建一个长度为100的list,如果下标能被10整除,则该位置数值跟下标相同,否则值为aaaa。然后多线程遍历list,取出list中的数值(字符串aaaa不要)进行累加求和
public class Test {
static List<String> list;
static AtomicInteger count;
public static void main(String[] args) throws InterruptedException {
// 初始化List, 并获得spliterator
list = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
if (i % 10 == 0) {
list.add(Integer.toString(i));
} else {
list.add("aaaa");
}
}
Spliterator<String> spliterator = list.spliterator();
// 求和结果
count = new AtomicInteger(0);
Spliterator<String> s1 = spliterator.trySplit();
Spliterator<String> s2 = spliterator.trySplit();
Thread main = new Thread(new Task(spliterator));
Thread t1 = new Thread(new Task(s1));
Thread t2 = new Thread(new Task(s2));
main.start();
t1.start();
t2.start();
t1.join();
t2.join();
main.join();
System.out.println(count);
}
// 判断字符串是数字
public static boolean isInteger(String str) {
Pattern pattern = Pattern.compile("^[-+]?[\\d]*$");
return pattern.matcher(str).matches();
}
static class Task implements Runnable {
private Spliterator<String> spliterator;
public Task(Spliterator<String> spliterator) {
this.spliterator = spliterator;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("线程" + threadName + "开始运行-----");
spliterator.forEachRemaining(x -> {
if (isInteger(x)) {
count.addAndGet(Integer.parseInt(x));
System.out.println("数值:" + x + "------" + threadName);
}
});
System.out.println("线程" + threadName + "运行结束-----");
}
}
}
------- Output -------
线程Thread-0开始运行-----
线程Thread-1开始运行-----
线程Thread-2开始运行-----
数值:60------Thread-2
数值:70------Thread-2
线程Thread-2运行结束-----
数值:80------Thread-0
数值:90------Thread-0
数值:10------Thread-1
数值:100------Thread-0
线程Thread-0运行结束-----
数值:20------Thread-1
数值:30------Thread-1
数值:40------Thread-1
数值:50------Thread-1
线程Thread-1运行结束-----
550
注意:
需要注意的是, 之前我在博客中写的代码如下:
package top.jasonkayzk; import java.util.ArrayList; import java.util.List; import java.util.Spliterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; /** * @author zk */ public class Test { public static void main(String[] args) throws InterruptedException { // 初始化List, 并获得spliterator final List<String> map = new ArrayList<>(); for (int i = 1; i <= 100; i++) { if (i % 10 == 0) { map.add(i + ""); } else { map.add("aaaa"); } } Spliterator<String> spliterator = map.spliterator(); // 求和结果 final AtomicInteger count = new AtomicInteger(0); // 开启线程数 final int threadNum = 4; // 计数器锁, 多个线程都处理完毕后才输出结果, 也可以使用join()方法 final CountDownLatch latch = new CountDownLatch(threadNum); // 定义处理线程任务 Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("线程" + threadName + "开始运行-----"); spliterator.trySplit().forEachRemaining((ele) -> { if (isInteger(ele)) { int num = Integer.parseInt(ele); count.addAndGet(num); System.out.println("数值:" + num + "------" + threadName); } }); System.out.println("线程" + threadName + "运行结束-----"); latch.countDown(); }; for (int i = 0; i < threadNum; i++) { new Thread(task).start(); } // 等待全部运行完毕 latch.await(); System.out.println(count); } // 判断字符串是数字 public static boolean isInteger(String str) { Pattern pattern = Pattern.compile("^[-+]?[\\d]*$"); return pattern.matcher(str).matches(); } } ------- Output ------- 线程Thread-0开始运行----- 线程Thread-2开始运行----- 线程Thread-1开始运行----- 线程Thread-3开始运行----- 数值:90------Thread-3 数值:80------Thread-1 数值:60------Thread-2 线程Thread-1运行结束----- 数值:70------Thread-2 线程Thread-3运行结束----- 线程Thread-2运行结束----- 数值:10------Thread-0 数值:20------Thread-0 数值:30------Thread-0 数值:40------Thread-0 数值:50------Thread-0 线程Thread-0运行结束----- 450
其结果并不为550, 因为最后一个元素被忽略了!
经过多次尝试也没有找到原因, 所以在使用时最好:
- 先做整体切割
- 然后创建内部类, 类中定义属性, 并在构造方法中注入spliterator对象
这样可以保证并发操作时可以正确分配而不受并发的影响, 并且将元素全部遍历
三. LinkedList中Spliterator的实现
与ArrayList类似, 在LinkedList中也存在一个实现了Spliterator接口的内部类: LLSpliterator 用于并发遍历整个表以提高性能, 使用方法与ArrayList类似
相同的例子: 创建一个长度为100的list,如果下标能被10整除,则该位置数值跟下标相同,否则值为aaaa。然后多线程遍历list,取出list中的数值(字符串aaaa不要)进行累加求和
package top.jasonkayzk;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
public class Test {
static List<String> list;
static AtomicInteger count;
public static void main(String[] args) throws InterruptedException {
// 初始化List, 并获得spliterator
list = new LinkedList<>();
for (int i = 1; i <= 100; i++) {
if (i % 10 == 0) {
list.add(Integer.toString(i));
} else {
list.add("aaaa");
}
}
Spliterator<String> spliterator = list.spliterator();
// 求和结果
count = new AtomicInteger(0);
Spliterator<String> s1 = spliterator.trySplit();
Spliterator<String> s2 = spliterator.trySplit();
Thread main = new Thread(new Task(spliterator));
Thread t1 = new Thread(new Task(s1));
Thread t2 = new Thread(new Task(s2));
main.start();
t1.start();
t2.start();
t1.join();
t2.join();
main.join();
System.out.println(count);
}
// 判断字符串是数字
public static boolean isInteger(String str) {
Pattern pattern = Pattern.compile("^[-+]?[\\d]*$");
return pattern.matcher(str).matches();
}
static class Task implements Runnable {
private Spliterator<String> spliterator;
public Task(Spliterator<String> spliterator) {
this.spliterator = spliterator;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("线程" + threadName + "开始运行-----");
spliterator.forEachRemaining(x -> {
if (isInteger(x)) {
count.addAndGet(Integer.parseInt(x));
System.out.println("数值:" + x + "------" + threadName);
}
});
System.out.println("线程" + threadName + "运行结束-----");
}
}
}
以上代码与之前ArrayList中的代码几乎相同(只是在创建List时使用的是new LinkedList<>()而已)
执行结果如下:
线程Thread-1开始运行-----
线程Thread-2开始运行-----
线程Thread-0开始运行-----
线程Thread-0运行结束-----
Exception in thread "Thread-2" java.lang.NullPointerException
at top.jasonkayzk.Test$Task.run(Test.java:65)
at java.base/java.lang.Thread.run(Thread.java:834)
数值:10------Thread-1
数值:20------Thread-1
数值:30------Thread-1
数值:40------Thread-1
数值:50------Thread-1
数值:60------Thread-1
数值:70------Thread-1
数值:80------Thread-1
数值:90------Thread-1
数值:100------Thread-1
线程Thread-1运行结束-----
550
结果是正确的, 但是爆出了NPE的错误!
但是将100增大, 比如到10000, 则不会报错了!
原因在于:
LLSpliterator和在ArrayList中实现的ArrayListSpliterator的实现方式还是有所区别的
在LLSpliterator中定义了BATCH_UNIT和batch
变量:
static final int BATCH_UNIT = 1 << 10; // 批处理数组大小增量, 1024
static final int MAX_BATCH = 1 << 25; // 最大批处理数组大小, 33554432
int est; // size估计(尾节点索引),初始值为-1(size estimate; -1 until first needed)
int expectedModCount; // 期望的改变计数。用来实现fail-fast机制
int batch; // 拆分的批量大小
而在源码的trySplit()方法中可以看到在LLSpliterator中是以batch(或者说是j)来进行拆分的:
public Spliterator<E> trySplit() {
Node<E> p;
int s = getEst();
if (s > 1 && (p = current) != null) {
int n = batch + BATCH_UNIT;
if (n > s)
n = s;
if (n > MAX_BATCH)
n = MAX_BATCH;
Object[] a = new Object[n];
int j = 0;
do { a[j++] = p.item; } while ((p = p.next) != null && j < n);
current = p;
batch = j;
est = s - j;
return Spliterators.spliterator(a, 0, j, Spliterator.ORDERED);
}
return null;
}
并且这个batch固定等于: batch + BATCH_UNIT, 而batch只能被初始化为0(没找到setter方法), 所以batch只能固定为1024!
说明: 在LLSpliterator中
① batch限定了每次差分的大小, 而batch并未通过构造函数或者Setter暴露, 所以只能为: BATCH_UNIT(1024), 即每一批次固定为1024个元素
② MAX_BATCH规定了可处理的总批数, 所以LLSpliterator可处理的最多元素是: MAX_BATCH x BATCH_UNIT
综上, 也就不难得出, 为什么当list长度为100时会报错了: 因为开了四个线程, 但是因为每一批固定为1024个, 所以其实另外三个线程都被分为了null, 所以报出NPE
总结: ArrayListSpliterator和LLSpliterator的底层实现还是不同的
我个人觉得主要是因为LinkedList的随机访问能力远不如ArrayList, 所以差分多个其实性能远远不如ArrayList好, 所以Java官方才这样设计的, 毕竟你拆分LinkedList时, 基本上已经遍历了整个链表了
所以使用Spliterator多线程并发操作List的时候, 一定要选择ArrayList!
四. HashMap(Set)中Spliterator的实现
map中有KeySpliterator, ValueSpliterator和EntrySpliterator都继承自内部抽象类HashMapSpliterator并实现了Spliterator接口, 因此可以通过这三个内部类获取相应的Spliterator来完成并发遍历HashMap
同时对于Set(例如HashSet), 其内部源码如下:
public Spliterator<E> spliterator() {
return new HashMap.KeySpliterator<>(map, 0, -1, 0, 0);
}
可见, HashSet中实际上返回的还是HashMap中Spliterator的实现(同理, TreeSet返回TreeMap的实现类)
同一个例子: 创建一个长度为100的list,如果下标能被10整除,则该位置数值跟下标相同,否则值为aaaa。然后多线程遍历list,取出list中的数值(字符串aaaa不要)进行累加求和
public class Test {
static HashMap<Integer, String> map;
static AtomicInteger count;
public static void main(String[] args) throws InterruptedException {
// 初始化List, 并获得spliterator
map = new HashMap<>();
for (int i = 1; i <= 109; i++) {
if (i % 10 == 0) {
map.put(i, Integer.toString(i));
} else {
map.put(i, "aaaa");
}
}
Spliterator<String> spliterator = map.values().spliterator();
// 求和结果
count = new AtomicInteger(0);
Spliterator<String> s1 = spliterator.trySplit();
Spliterator<String> s2 = s1.trySplit();
Thread main = new Thread(new Task(spliterator));
Thread t1 = new Thread(new Task(s1));
Thread t2 = new Thread(new Task(s2));
main.start();
t1.start();
t2.start();
t1.join();
t2.join();
main.join();
System.out.println(count);
}
// 判断字符串是数字
public static boolean isInteger(String str) {
Pattern pattern = Pattern.compile("^[-+]?[\\d]*$");
return pattern.matcher(str).matches();
}
static class Task implements Runnable {
private Spliterator<String> spliterator;
public Task(Spliterator<String> spliterator) {
this.spliterator = spliterator;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("线程" + threadName + "开始运行-----");
spliterator.forEachRemaining(x -> {
if (isInteger(x)) {
count.addAndGet(Integer.parseInt(x));
System.out.println("数值:" + x + "------" + threadName);
}
});
System.out.println("线程" + threadName + "运行结束-----");
}
}
}
------- Output -------
线程Thread-1开始运行-----
线程Thread-2开始运行-----
线程Thread-0开始运行-----
线程Thread-0运行结束-----
数值:10------Thread-2
数值:20------Thread-2
数值:30------Thread-2
数值:40------Thread-2
数值:50------Thread-2
数值:60------Thread-2
线程Thread-2运行结束-----
数值:70------Thread-1
数值:80------Thread-1
数值:90------Thread-1
数值:100------Thread-1
线程Thread-1运行结束-----
550
注: HashMap的遍历和ArrayList有很大的不同!
由于HashMap中的遍历涉及到桶(存储分区数目)以及树(或者链表)结构, 且和具体数据的hash值也有很大的关系
所以可能会出现像上例中Thread-0类似: 做了切分但实际上没有分配任何数据!
HashMap由于结构十分复杂, 以后有机会再研究内部的并发实现
总结
以上讲述了Spliterator接口的源码, 以及Spliterator在不同集合类中的具体实现类的区别
要知道, 对于不同的场景下, 具体的Spliterator实现类所具有的特点是不完全一样的(尽管他们都实现了同一个接口)