前言
Java 8中,流有一个非常大的局限性,使用时,对它操作一次仅能得到一个处理结果。当流进行终端操作后,如果你在试图遍历它,就会出现异常。
1
| java.lang.IllegalStateException: stream has already been operated upon or closed
|
虽然流就是如此设计的,但是我们有时候就希望可以通过流获取多个结果。或者说,你希望一次性向流中传入多个Lambda表达式。 为了达到这一目标,我们应该需要一个fork类型的方法,对每个复制的流应用不同的函数。理想情况下,这些操作也应该支持并行去拿到运算结果。
这一特性在Java 8中是没有的,不过我们可以利用一个通用API,即Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现这一特性。
正文
复制流
要达到此效果,我们首先应该创建一个StreamForker,它会对原始的流进行封装,在此基础上在执行各种操作。我们来看下代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class StreamForker<T> { private final Stream<T> stream; private final Map<Object, Function<Stream<T>,?>> forks=new HashMap<>(); public StreamForker(Stream<T> stream) { this.stream = stream; } public StreamForker<T> fork(Object key,Function<Stream<T>,?> f){ forks.put(key,f); return this; } public Results getResults(){ } }
|
fork方法接受两个参数。
**Function:**对流进行处理,转变成这些操作结果的类型。
key: 通过它拿到结果,这些结果被放到内部的一个Map中。
fork方法需要返回自身,这样可以复制多个操作构成流水线。
如图:
上图不难理解。
而由fork方法添加的操作如何执行呢,就是通过getResults方法的调用触发,该方法返回一个Results接口的实现。接口定义如下:
1 2 3
| public interface Results { public <R> R get(Object key); }
|
实现Results接口
我们使用ForkingStreamConsumer实现Results接口。
1 2 3 4 5 6 7 8 9
| public Results getResults(){ ForkingStreamConsumer<T> consumer=build(); try{ stream.sequential().forEach(consumer); }finally { consumer.finish(); } return consumer; }
|
ForkingStreamConsumer同时实现了Results和Consumer接口。其主要任务就是来处理流元素,将他们分发到多个BlockingQuenes中处理,BlockingQuenes的数量和通过fork方法提交的操作数是一致的。这里的getResults的实现,流应该是顺序处理的,否则,forEach后元素的顺序就会变化。finish方法用来表明队列中没有更多要处理的元素了。build方法主要用于创建ForkingStreamConsumer。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private ForkingStreamConsumer<T> build(){ List<BlockingQueue<T>> queues=new ArrayList<>(); Map<Object,Future<?>> actions= forks.entrySet().stream().reduce( new HashMap<Object,Future<?>>(), (map,e)->{ map.put(e.getKey(),getOperationResult(queues,e.getValue())); return map; }, (m1,m2)->{ m1.putAll(m2); return m1; } ); return new ForkingStreamConsumer<>(queues,actions); }
|
可以看到,我们先创建了BlockingQuenes列表。接着创建了一个Map,Map的键就是用来标识不同操作的键,值包含着Future里。最终BlockingQuenes和Map会被传递给ForkingStreamConsumer的构造函数。每个Future通过关键方法getOperationResult创建。
来看看getOperationResult的实现。
1 2 3 4 5 6 7 8 9 10 11
| private Future<?> getOperationResult(List<BlockingQueue<T>> queues,Function<Stream<T>,?> f){ BlockingQueue<T> queue=new LinkedBlockingDeque<>(); queues.add(queue); Spliterator<T> spliterator=new BlockingQueueSpliterator<>(queue); Stream<T> source= StreamSupport.stream(spliterator,false); return CompletableFuture.supplyAsync(()->f.apply(source)); }
|
该方法创建一个新的BlockingQuene,并将其添加到队列列表。队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator。然后我们创建一个顺序流对Spliterator进行遍历,最终创建一个Future收集结果。
开发ForkingStreamConsumer
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ForkingStreamConsumer<T> implements Consumer<T>,Results { public static final Object END_OF_STREAM=new Object(); private final List<BlockingQueue<T>> queues; private final Map<Object, Future<?>> actions; public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) { this.queues = queues; this.actions = actions; } @Override public <R> R get(Object key) { try { return ((Future<R>)actions.get(key)).get(); }catch(Exception e){ throw new RuntimeException(e); } } @Override public void accept(T t) { queues.forEach(q->q.add(t)); } void finish(){ accept((T)END_OF_STREAM); } }
|
这个类同时实现了Consumer接口和Results接口。
Consumer接口要求实现accept方法,每当ForkingStreamConsumer接受流中的一个元素,它就会将元素添加到所有BlockingQuenes中当所有元素都添加到所有队列后,finish方法将最后一个元素添加到所有队列。处理时碰上这个元素表明后面没有元素要处理了。
Results接口需要实现get方法。一旦处理结束,get方法会获取Map中由键索引的Future,解析到结果后返回。
每有一个操作,就会对应一个BlockingQueueSpliterator。我们来看下BlockingQueueSpliterator的实现。
开发BlockingQueueSpliterator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class BlockingQueueSpliterator<T> implements Spliterator<T> { private final BlockingQueue<T> q; public BlockingQueueSpliterator(BlockingQueue<T> q) { this.q = q; } @Override public boolean tryAdvance(Consumer<? super T> action) { T t; while (true){ try { t=q.take(); break; }catch(InterruptedException e){ } } if(t!=ForkingStreamConsumer.END_OF_STREAM){ action.accept(t); return true; } return false; } @Override public Spliterator<T> trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; } }
|
可以看到该Spliterator未定义任何切割流的策略,仅仅利用了流的延迟绑定能力。也没有实现trySplit方法。由于我们的操作数是不确定的,故estimateSize不能提供任何有意义的数字,返回0.也没有体现Spliterator的特性,故characteristics返回0.
仅仅实现了tryAdvance方法,它从BlockingQueue中取得原始流元素,进一步传给Consumer对象。当返回true时表明还有元素要处理,直到发现最后一个元素时终止。
以上基本上是在一个流上执行多种操作的代码。
我们下面来检测一下正确性。
测试
编写测试类。如下数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) { List<Integer> list1=IntStream.rangeClosed(1,1000).filter(n->n%2==0).boxed().collect(Collectors.toList()); List<Integer> list2=IntStream.rangeClosed(1,1000).filter(n->n%5==0).boxed().collect(Collectors.toList()); Results results=new StreamForker<Integer>(list1.stream()) .fork("sum",s->s.mapToInt(Integer::intValue).sum()) .fork("count",s->s.count()) .fork("list3",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).collect(Collectors.toList())) .fork("max",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).max(Comparator.naturalOrder())) .fork("min",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).min(Comparator.naturalOrder())) .getResults(); System.out.println("sum="+results.get("sum")); System.out.println("count="+results.get("count")); System.out.println("max="+((Optional) results.get("max")).get()); System.out.println("min="+((Optional)results.get("min")).get()); ((List<Integer>)results.get("list3")).stream().forEach(System.out::println); }
|
输出结果:
可以看到,使用了一个流,通过我们实现的方法进行了多次终端操作返回正确结果。
性能问题
这是我们用一个流实现多种终端操作的方式,当然这并不意味着会比普通的写法效率高,如果对于上述问题,我们可以分个构建若干个流进行一一实现。
这种一个流进行多个终端操作的情况使用,一定是生成流比较耗费资源性能时才会用到,比如操作一个较大文件时生成的字符流,我们想统计字数,检查某些单词出现的次数,统计行数等等操作,重复生成流显然是耗费资源的。这种情况可以考虑使用这种一个流进行多个终端操作的实现。
当然,具体到具体问题优化,建议认真分析两者的资源消耗。这是比较稳妥的做法。