Java8 Spliterator接口

前言

Spliterator 是Java8中加入的一个新接口,是“可分迭代器”(splitable iterator)的意思。它也是用来遍历数据源中的元素的,但它是为并行执行而设计的。

其接口主要代码如下:

1
2
3
4
5
6
Public interface Spliterator<T>{
Boolean tryAdvance(Consumer <? super T>) action;
Spliterator<T> trySplit();
Long estimateSize();
Int characteristics();
}

**T **是Spliterator要遍历的元素类型。

tryAdvance 方法的行为类似于普通的Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true。

trySplit 是专为Spliterator接口而设计的,因为它可以把一些元素划出去分给第二个Spliterator,让他们两个并行处理。

estimateSize方法估计还剩多少元素需要遍历,因为即使不那么精确,快速算出来的值也有助于让拆分均匀点。

注:将Stream流拆分成多个部分的算法是个递归过程,第一步第一个Spliterator调用trySplit,生成两个Spliterator,第二步这两个Spliterator调用trySplit,生成4个Spliterator,直到调用Spliterator的trySplit 方法后返回null,表示这部分Spliterator不能在分割。

这个拆分过程也受Spliterator本身特性的影响,而特性是通过characteristics方法声明的。

我们来简单看一下它的特性的常用值。

特性含义
ORDERED按元素的既定顺序遍历和划分
DISTINCT对于任一遍历过的元素x,y,x.equals(y)返回false
SORTED遍历元素按照一个预定义顺序排序
SIZEDSpliterator由一个已知大小的数据源建立,estimateSize会返回准确值
NONNULL保证遍历元素不会为空
IMMUTABLESpliterator的数据源不能被修改,(不能 添加、删除、修改任何元素)
CONCURRENTSpliterator的数据源可以被其他线程同时修改而无需同步
SUBSIZED该Spliterator和从它拆分出来的Spliterator都是SIZED的

例子

为什么我们需要了解这个类,有的时候甚至要实现这个类呢?

我们来看一个例子。

对于下面一个String,我想统计下单词数量。

1
static final String WORD="Hello World Happy EveryDay Good good study day day up let us study Spliterator";

我们需要创建一个counter来累计流中字符,以及在counter中把它们结合起来的逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class WordCounter {
private final int counter;
private final boolean lastSpace;

public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
//遍历一个个的Character
public WordCounter accumulate(Character c){
if(Character.isWhitespace(c)){
return lastSpace ? this : new WordCounter(counter,true);
}else{
//上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一
return lastSpace ? new WordCounter(counter+1,false):this;
}
}
//合并两个WordCounter,将其计数器加起来
public WordCounter combine(WordCounter wordCounter){
return new WordCounter(counter+wordCounter.counter,wordCounter.lastSpace);
}
public int getCounter(){
return counter;
}
}

这时候,我们在书写一个规约Character流统计单词个数就很简单了。

1
2
3
4
public static int countWords(Stream<Character> stream){
WordCounter wordCounter=stream.reduce(new WordCounter(0,true),WordCounter::accumulate,WordCounter::combine);
return wordCounter.getCounter();
}
1
2
Stream<Character> stream= IntStream.range(0,WORD.length()).mapToObj(WORD::charAt);
System.out.println(countWords(stream));

输出14。结果是正确的。

现在我们让他在并行流上进行工作:

1
Stream<Character> stream= IntStream.range(0,WORD.length()).mapToObj(WORD::charAt).parallel();

结果输出26。显然这是不正确的。一脸懵逼。

为什么会出现这种情况呢?

因为在并行流进行Spliterator分割时,把一个单词拆分成两部分了,导致结果变大。这显然不是我们想看到的。

实践

我们要处理这种情况,就要指定分割原则,不要让程序把整个单词切开。

因此我们需要编写自己的Spliterator才能让上述问题在并行流下工作。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar=0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
//处理当前字符
action.accept(string.charAt(currentChar++));
//如果还有字符要处理,返回true
return currentChar<string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize=string.length()-currentChar;
//返回null表示要处理的String已经足够小
if(currentSize<10){
return null;
}
//将试探拆分位置设定到要解析的文字中间
for(int splitPos=currentSize/2+currentChar;splitPos<string.length();splitPos++){
//如果是空格就开始拆分,不是空格将拆分位置前进直到下一个空格
if(Character.isWhitespace(string.charAt(splitPos))){
//创建一个新的WordCounterSpliterator来解析String从开始到拆分位置的部分
Spliterator<Character> spliterator=
new WordCounterSpliterator(string.substring(currentChar,splitPos));
//将这个WordCounterSpliterator的起始位置设为拆分位置
currentChar=splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length()-currentChar;
}
@Override
public int characteristics() {
return ORDERED+SIZED+SUBSIZED+NONNULL+IMMUTABLE;
}
}

tryAdvance方法把String中当前位置的Character传给了Consumer,并让位置加一。作为参数传递的内部类Consumer,在遍历流时将要处理的Character传递给要执行的函数。如果新的指针位置小于String总长度,说明没有遍历完,返回true继续遍历。

trySplit方法,首先我们设置了一个拆分下限——10个Character,实际应用中我们应尽量提高这个长度避免生成太多的任务。如果长度小于这个数,就返回空无需继续拆分。否则就把试探拆分位置放到要解析的String块中间,但不能直接使用此位置,应该看看是不是空格,如果是就拆分,如果不是,就向前找,找到空格进行拆分,避免把一个单词拆成两份。

estimatedSize方法返回的是这个Spliterator解析的String的总长度和当前遍历位置的差值。

characteristic方法告诉这个框架是ORDERED(String的每个Character的默认顺序),SIZED(estimatedSize方法返回值是精确的),SUBSIZED(trySplit分出来的Spliterator大小也是固定的),NONNULL(String里面的Character不可能为null),IMMUTABLE(String本身就不可变化)。

下面我们测试一下我们的WordCounterSpliterator 。

1
2
3
Spliterator<Character> spliterator=new WordCounterSpliterator(WORD);
Stream<Character> stream= StreamSupport.stream(spliterator,true);
System.out.println(countWords(stream));

可以看到输出结果为14.

结论

可以看到,并行流不是所有情况都适用的,有些情况要定制自己的Spliterator才能使并行流正常工作。这个例子或许运行效率并行比不上串行,但是在大数据下,比如分析一个文本文件中的单词数量,就能明显看到并行带来的速度优势了。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道