1、parallelStream的作用
使用Stream具有同时处理多个任务的能力,处理的过程同时分开执行,也就是将一个大任务切分成多个小任务,表示每个任务都是一个操作,类似如下代码:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);
展示顺序不一定是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,对于forEach()
这个操作,如果同地并行处理时,要最后顺序是原来Stream的数据顺序,那可以调用forEachOrdered()
。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);
注意 : 如果forEachOrdered()
中间,有其他如filter()
的操作,会尝试并行处理,然后forEachOrdered()
会以原数据顺序处理,因此,使用forEachOrdered()
这类的有序处理,可能会(或完全失去)失去并行同时执行的一些优势,实际上中间操作亦有可能如此,例如,sorted()
方法。
2、ForkJoinPool的简介
ForkJoin
框架是从jdk7中新特性,它同ThreadPoolExecutor
一样,也实现了Executor
和ExecutorService
接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
1) 工作窃取算法
forkjoin最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个forkjion框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
2) ThreadPoolExecutor和ForkJoinPool性能差异
使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
3、parallelStream的使用示例
使用parallelStream并行执行,需要要根据实际情况使用,并不是所有的情况都需要使用,使用它可能会有额外的开销,反尔会比不用之前慢也是有可能的。还有可能增加程序的复杂程序,运行结果也可能达不到预期。在有些磁盘IO、网络IO或数据库等操作时,可以考虑使用,操作的数据量比较大时,也可以考虑。
AtomicInteger recordNumber = new AtomicInteger();
final int batchSize = 10;
try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(),
StandardCharsets.UTF_8);) {
Supplier<List<Record>> batchSupplier = () -> {
List<Record> batch = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
String nextLine;
try {
nextLine = reader.readLine();
} catch (IOException e) {
//hanlde exception
throw new RuntimeException(e);
}
if(null == nextLine)
return batch;
batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
}
System.out.println("next batch");
return batch;
};
Stream.generate(batchSupplier)
.takeWhile(list -> list.size() >= batchSize)
.map(list -> list.parallelStream()
.filter(record -> doSomeOperation())
.collect(Collectors.toList()))
.flatMap(List::stream)
.forEach(System.out::println);
}
相关文档:
Java stream filter()过滤List数据的方法及示例代码
Java 使用stream I/O(Inputstream/Outputstream)读写文件的方法及示例代码
Java stream处理大量数据时显示处理进度信息方法及示例代码
Java stream使用多个过滤器(filter)或复杂条件方法用法及简单写法代码
Java 使用stream()将Map String, List String 数据求和(sum)方法代码