Java Stream处理大量数据时显示处理进度信息方法及示例代码

Java Stream处理数百万个元素。Map-Reduce算法需要几毫秒的时间,因此任务完成大约需要20分钟。在处理大量数据时,显示处理进度信息可以帮助用户了解程序的运行状态,避免用户因长时间等待而感到困惑或认为程序已经卡死。Java Stream API 本身并不直接提供进度显示的功能,但可以通过一些技巧来实现。本文主要介绍Java Stream处理大数据量信息显示处理进度信息及示例代码。

提示进度信息类似如下:

  5% (08s)
10% (14s)
15% (20s)
...

1、通过peek显示进度信息

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        int elementsCount = 1000; // 假设总元素数量
        Stream<MyData> myStream = readData(); // 模拟数据流
        final AtomicInteger loader = new AtomicInteger();
        int fivePercent = elementsCount / 20;

        MyResult result = myStream
            .map(Main::process)
            .peek(stat -> {
                if (loader.incrementAndGet() % fivePercent == 0) {
                    System.out.println(loader.get() +
                    " elements out of " +
                    elementsCount + " processed");
                    System.out.println((5 * (loader.get() / fivePercent))
                    + "% completed");
                }
            })
            .reduce(MyResult::aggregate)
            .orElse(null); // 防止 reduce 返回 Optional 时未处理空值

        System.out.println("Processing complete.");
    }

    private static Stream<MyData> readData() {
       // 模拟生成 1000 个 MyData 实例
        return Stream.generate(MyData::new).limit(1000);
    }

    private static MyResult process(MyData row) {
        // 模拟处理逻辑
        return new MyResult();
    }
}

class MyData {
    // 示例数据类
}

class MyResult {
    // 示例结果类
    public static MyResult aggregate(MyResult r1, MyResult r2) {
        return new MyResult(); // 模拟聚合逻辑
    }
}

2、使用map显示进度信息

import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Main
{
    public static void main(String[] args)
    {
        int size = 250;
        Stream<Integer> stream = readData(size);
        LongConsumer progressConsumer = progress -> 
        {
            // "Filter" the output here: Report only every 10th element
            if (progress % 10 == 0)
            {
                double relative = (double) progress / (size - 1);
                double percent = relative * 100;
                System.out.printf(Locale.ENGLISH,
                    "Progress %8d, relative %2.5f, percent %3.2f\n",
                    progress, relative, percent);
            }
        };
        Integer result = stream
            .map(element -> process(element))
            .map(progressMapper(progressConsumer))
            .reduce(0, (a, b) -> a + b);
        System.out.println("result " + result);
    }
    private static <T> Function<T, T> progressMapper(
        LongConsumer progressConsumer)
    {
        AtomicLong counter = new AtomicLong(0);
        return t -> 
        {
            long n = counter.getAndIncrement();
            progressConsumer.accept(n);
            return t;
        };
    }
    private static Integer process(Integer element)
    {
        return element * 2;
    }
    private static Stream<Integer> readData(int size)
    {
        Iterator<Integer> iterator = new Iterator<Integer>()
        {
            int n = 0;
            @Override
            public Integer next()
            {
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                return n++;
            }
            @Override
            public boolean hasNext()
            {
                return n < size;
            }
        };
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(
                iterator, Spliterator.ORDERED), false);
    }
}

3、带进度的序列流处理

import java.util.stream.IntStream;

public class StreamProgressExample {
    public static void main(String[] args) {
        int totalElements = 100000;
        int[] progress = {0}; // 用于跟踪进度

        IntStream.range(0, totalElements).forEach(i -> {
            // 模拟处理
            process(i);

            // 更新进度
            if (i % (totalElements / 10) == 0) { // 每 10% 更新一次
                System.out.println("Progress: " + (i * 100 / totalElements) + "%");
            }
        });

        System.out.println("Processing complete.");
    }

    private static void process(int i) {
        // 模拟处理时间
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

4、并行流的线程安全进度跟踪

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class Main {
    public static void main(String[] args) {
        int totalElements = 100000;
        AtomicInteger counter = new AtomicInteger();

        IntStream.range(0, totalElements).parallel().forEach(i -> {
            // 模拟处理
            process(i);

            // 更新进度
            int current = counter.incrementAndGet();
            if (current % (totalElements / 10) == 0) { // 每 10% 更新一次
                System.out.println("Progress: " + (current * 100 / totalElements) + "%");
            }
        });

        System.out.println("Processing complete.");
    }

    private static void process(int i) {
        // 模拟处理时间
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5、使用分批处理

import java.util.stream.IntStream;

public class BatchProcessingWithProgress {
    public static void main(String[] args) {
        int totalElements = 100000;
        int batchSize = 1000;

        IntStream.range(0, totalElements)
                .boxed()
                .collect(java.util.stream.Collectors.groupingBy(i -> i / batchSize))
                .forEach((batch, elements) -> {
                    // 处理批次
                    elements.forEach(BatchProcessingWithProgress::process);
                    System.out.println("Processed batch: " + (batch + 1));
                });

        System.out.println("Processing complete.");
    }

    private static void process(int i) {
        // 模拟处理
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

推荐阅读
cjavapy编程之路首页