Future是Java 5添加的类,用来描述一个异步计算的结果。可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,也可以使用cancel方法停止任务的执行。
本文介绍CompletableFuture的使用,从Java 8引入,它提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
关于异步处理,在之前的文章Java 多线程中也有涉猎。
使用自带的supplyAsync异步
public CompletableFuture<StockPushContent> getStockContentByAsync(String code){
return CompletableFuture.supplyAsync(() -> getStockContent(code));
}
supplyAsync的实现:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
其中asyncPool默认使用ForkJoinPool,线程数默认为Runtime.getRuntime().availableProcessors() - 1.
使用自定义创建的线程池异步
@Component
public class ThreadUtil {
private ExecutorService threadPoolService;
@PostConstruct
public void init() {
threadPoolService = new ThreadPoolExecutor(4, 32, 10L, TimeUnit.SECONDS,
new SynchronousQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 异步执行任务
* @param task
*/
public void postTask(Runnable task) {
threadPoolService.execute(task);
}
}
public CompletableFuture<StockPushContent> getStockContentByThreadPool(String code) {
CompletableFuture<StockPushContent> future = new CompletableFuture<>();
threadUtil.postTask(() -> future.complete(getStockContent(code)));
return future;
}
例子:使用线程池并发执行列表,并汇总结果
public List<StockPushContent> getStockContentList(List<String> codeList) {
// 列表:并发任务Future
List<CompletableFuture<StockPushContent>> completableFutureList = new ArrayList<>();
for (String code : codeList) {
completableFutureList.add(getStockContentByThreadPool(code));
}
try {
// 使用CompletableFuture.allOf并发请求
CompletableFuture<Void> allFuturesResult =
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
return allFuturesResult.thenApply(v ->
completableFutureList.stream().
map(future -> future.join()).
collect(Collectors.toList())
).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("getStockContentList error: {}", e.getMessage(), e);
}
return null;
}