提示进度信息类似如下:
5% (08s)
10% (14s)
15% (20s)
...
1、通过peek显示进度信息
import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; public class Main { public static void main(String[] args) { int elementsCount = 1000; // 假设总元素数量 Stream<MyData> myStream = readData(); // 模拟数据流 final AtomicInteger loader = new AtomicInteger(); int fivePercent = elementsCount / 20; MyResult result = myStream .map(Main::process) .peek(stat -> { if (loader.incrementAndGet() % fivePercent == 0) { System.out.println(loader.get() + " elements out of " + elementsCount + " processed"); System.out.println((5 * (loader.get() / fivePercent)) + "% completed"); } }) .reduce(MyResult::aggregate) .orElse(null); // 防止 reduce 返回 Optional 时未处理空值 System.out.println("Processing complete."); } private static Stream<MyData> readData() { // 模拟生成 1000 个 MyData 实例 return Stream.generate(MyData::new).limit(1000); } private static MyResult process(MyData row) { // 模拟处理逻辑 return new MyResult(); } } class MyData { // 示例数据类 } class MyResult { // 示例结果类 public static MyResult aggregate(MyResult r1, MyResult r2) { return new MyResult(); // 模拟聚合逻辑 } }
2、使用map显示进度信息
import java.util.Iterator; import java.util.Locale; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; public class Main { public static void main(String[] args) { int size = 250; Stream<Integer> stream = readData(size); LongConsumer progressConsumer = progress -> { // "Filter" the output here: Report only every 10th element if (progress % 10 == 0) { double relative = (double) progress / (size - 1); double percent = relative * 100; System.out.printf(Locale.ENGLISH, "Progress %8d, relative %2.5f, percent %3.2f\n", progress, relative, percent); } }; Integer result = stream .map(element -> process(element)) .map(progressMapper(progressConsumer)) .reduce(0, (a, b) -> a + b); System.out.println("result " + result); } private static <T> Function<T, T> progressMapper( LongConsumer progressConsumer) { AtomicLong counter = new AtomicLong(0); return t -> { long n = counter.getAndIncrement(); progressConsumer.accept(n); return t; }; } private static Integer process(Integer element) { return element * 2; } private static Stream<Integer> readData(int size) { Iterator<Integer> iterator = new Iterator<Integer>() { int n = 0; @Override public Integer next() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return n++; } @Override public boolean hasNext() { return n < size; } }; return StreamSupport.stream( Spliterators.spliteratorUnknownSize( iterator, Spliterator.ORDERED), false); } }
3、带进度的序列流处理
import java.util.stream.IntStream; public class StreamProgressExample { public static void main(String[] args) { int totalElements = 100000; int[] progress = {0}; // 用于跟踪进度 IntStream.range(0, totalElements).forEach(i -> { // 模拟处理 process(i); // 更新进度 if (i % (totalElements / 10) == 0) { // 每 10% 更新一次 System.out.println("Progress: " + (i * 100 / totalElements) + "%"); } }); System.out.println("Processing complete."); } private static void process(int i) { // 模拟处理时间 try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
4、并行流的线程安全进度跟踪
import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; public class Main { public static void main(String[] args) { int totalElements = 100000; AtomicInteger counter = new AtomicInteger(); IntStream.range(0, totalElements).parallel().forEach(i -> { // 模拟处理 process(i); // 更新进度 int current = counter.incrementAndGet(); if (current % (totalElements / 10) == 0) { // 每 10% 更新一次 System.out.println("Progress: " + (current * 100 / totalElements) + "%"); } }); System.out.println("Processing complete."); } private static void process(int i) { // 模拟处理时间 try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
5、使用分批处理
import java.util.stream.IntStream; public class BatchProcessingWithProgress { public static void main(String[] args) { int totalElements = 100000; int batchSize = 1000; IntStream.range(0, totalElements) .boxed() .collect(java.util.stream.Collectors.groupingBy(i -> i / batchSize)) .forEach((batch, elements) -> { // 处理批次 elements.forEach(BatchProcessingWithProgress::process); System.out.println("Processed batch: " + (batch + 1)); }); System.out.println("Processing complete."); } private static void process(int i) { // 模拟处理 try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }