Java Stream并行parallelStream使用方法示例代码

parallelStream其实就是一个并行执行的流.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度。本文主要介绍Java中,执行Stream相关操作时,使用并行parallelStream()方法的使用,以及相关介绍和示例代码。

1、parallelStream的作用

使用Stream具有同时处理多个任务的能力,处理的过程同时分开执行,也就是将一个大任务切分成多个小任务,表示每个任务都是一个操作,类似如下代码:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);

展示顺序不一定是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,对于forEach()这个操作,如果同地并行处理时,要最后顺序是原来Stream的数据顺序,那可以调用forEachOrdered()。例如:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);

注意 : 如果forEachOrdered()中间,有其他如filter()的操作,会尝试并行处理,然后forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去并行同时执行的一些优势,实际上中间操作亦有可能如此,例如,sorted()方法。

2、ForkJoinPool的简介

ForkJoin框架是从jdk7中新特性,它同ThreadPoolExecutor一样,也实现了ExecutorExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

1) 工作窃取算法

forkjoin最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个forkjion框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

2) ThreadPoolExecutor和ForkJoinPool性能差异

使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

3、parallelStream的使用示例

使用parallelStream并行执行,需要要根据实际情况使用,并不是所有的情况都需要使用,使用它可能会有额外的开销,反尔会比不用之前慢也是有可能的。还有可能增加程序的复杂程序,运行结果也可能达不到预期。在有些磁盘IO、网络IO或数据库等操作时,可以考虑使用,操作的数据量比较大时,也可以考虑。

AtomicInteger recordNumber = new AtomicInteger();
final int batchSize = 10;
try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
        StandardCharsets.UTF_8);) {
    Supplier<List<Record>> batchSupplier = () -> {
        List<Record> batch = new ArrayList<>();
        for (int i = 0; i < batchSize; i++) {
            String nextLine;
            try {
                nextLine = reader.readLine();
            } catch (IOException e) {
                //hanlde exception
                throw new RuntimeException(e);
            }
            if(null == nextLine) 
                return batch;
            batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
        }
        System.out.println("next batch");
        return batch;
    };
    Stream.generate(batchSupplier)
        .takeWhile(list -> list.size() >= batchSize)
        .map(list -> list.parallelStream()
                         .filter(record -> doSomeOperation())
                         .collect(Collectors.toList()))
        .flatMap(List::stream)
        .forEach(System.out::println);
}

相关文档:

Java stream filter()过滤List数据的方法及示例代码

Java 使用stream I/O(Inputstream/Outputstream)读写文件的方法及示例代码

Java stream处理大量数据时显示处理进度信息方法及示例代码

Java stream使用多个过滤器(filter)或复杂条件方法用法及简单写法代码

Java 使用stream()将Map String, List String 数据求和(sum)方法代码

Java 连接合并两个int[](数组)的方法(Intstream和stream)

Java 使用stream()过滤(filter)筛选List T 列表数据并记录过滤的值日志方法代码

推荐阅读
cjavapy编程之路首页