提示进度信息类似如下:
5% (08s)
10% (14s)
15% (20s)
...
1、通过peek显示进度信息
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
int elementsCount = 1000; // 假设总元素数量
Stream<MyData> myStream = readData(); // 模拟数据流
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
.map(Main::process)
.peek(stat -> {
if (loader.incrementAndGet() % fivePercent == 0) {
System.out.println(loader.get() +
" elements out of " +
elementsCount + " processed");
System.out.println((5 * (loader.get() / fivePercent))
+ "% completed");
}
})
.reduce(MyResult::aggregate)
.orElse(null); // 防止 reduce 返回 Optional 时未处理空值
System.out.println("Processing complete.");
}
private static Stream<MyData> readData() {
// 模拟生成 1000 个 MyData 实例
return Stream.generate(MyData::new).limit(1000);
}
private static MyResult process(MyData row) {
// 模拟处理逻辑
return new MyResult();
}
}
class MyData {
// 示例数据类
}
class MyResult {
// 示例结果类
public static MyResult aggregate(MyResult r1, MyResult r2) {
return new MyResult(); // 模拟聚合逻辑
}
}
2、使用map显示进度信息
import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Main
{
public static void main(String[] args)
{
int size = 250;
Stream<Integer> stream = readData(size);
LongConsumer progressConsumer = progress ->
{
// "Filter" the output here: Report only every 10th element
if (progress % 10 == 0)
{
double relative = (double) progress / (size - 1);
double percent = relative * 100;
System.out.printf(Locale.ENGLISH,
"Progress %8d, relative %2.5f, percent %3.2f\n",
progress, relative, percent);
}
};
Integer result = stream
.map(element -> process(element))
.map(progressMapper(progressConsumer))
.reduce(0, (a, b) -> a + b);
System.out.println("result " + result);
}
private static <T> Function<T, T> progressMapper(
LongConsumer progressConsumer)
{
AtomicLong counter = new AtomicLong(0);
return t ->
{
long n = counter.getAndIncrement();
progressConsumer.accept(n);
return t;
};
}
private static Integer process(Integer element)
{
return element * 2;
}
private static Stream<Integer> readData(int size)
{
Iterator<Integer> iterator = new Iterator<Integer>()
{
int n = 0;
@Override
public Integer next()
{
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return n++;
}
@Override
public boolean hasNext()
{
return n < size;
}
};
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
iterator, Spliterator.ORDERED), false);
}
}
3、带进度的序列流处理
import java.util.stream.IntStream;
public class StreamProgressExample {
public static void main(String[] args) {
int totalElements = 100000;
int[] progress = {0}; // 用于跟踪进度
IntStream.range(0, totalElements).forEach(i -> {
// 模拟处理
process(i);
// 更新进度
if (i % (totalElements / 10) == 0) { // 每 10% 更新一次
System.out.println("Progress: " + (i * 100 / totalElements) + "%");
}
});
System.out.println("Processing complete.");
}
private static void process(int i) {
// 模拟处理时间
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4、并行流的线程安全进度跟踪
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class Main {
public static void main(String[] args) {
int totalElements = 100000;
AtomicInteger counter = new AtomicInteger();
IntStream.range(0, totalElements).parallel().forEach(i -> {
// 模拟处理
process(i);
// 更新进度
int current = counter.incrementAndGet();
if (current % (totalElements / 10) == 0) { // 每 10% 更新一次
System.out.println("Progress: " + (current * 100 / totalElements) + "%");
}
});
System.out.println("Processing complete.");
}
private static void process(int i) {
// 模拟处理时间
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5、使用分批处理
import java.util.stream.IntStream;
public class BatchProcessingWithProgress {
public static void main(String[] args) {
int totalElements = 100000;
int batchSize = 1000;
IntStream.range(0, totalElements)
.boxed()
.collect(java.util.stream.Collectors.groupingBy(i -> i / batchSize))
.forEach((batch, elements) -> {
// 处理批次
elements.forEach(BatchProcessingWithProgress::process);
System.out.println("Processed batch: " + (batch + 1));
});
System.out.println("Processing complete.");
}
private static void process(int i) {
// 模拟处理
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}