Future是Java 5添加的类,用来描述一个异步计算的结果。可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,也可以使用cancel方法停止任务的执行。

本文介绍CompletableFuture的使用,从Java 8引入,它提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

关于异步处理,在之前的文章Java 多线程中也有涉猎。

使用自带的supplyAsync异步

public CompletableFuture<StockPushContent> getStockContentByAsync(String code){
    return CompletableFuture.supplyAsync(() -> getStockContent(code));
}

supplyAsync的实现:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                    Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

其中asyncPool默认使用ForkJoinPool,线程数默认为Runtime.getRuntime().availableProcessors() - 1.

使用自定义创建的线程池异步

@Component
public class ThreadUtil {
    private ExecutorService threadPoolService;

    @PostConstruct
    public void init() {
        threadPoolService = new ThreadPoolExecutor(4, 32, 10L, TimeUnit.SECONDS,
            new SynchronousQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 异步执行任务
     * @param task
     */
    public void postTask(Runnable task) {
        threadPoolService.execute(task);
    }
}

public CompletableFuture<StockPushContent> getStockContentByThreadPool(String code) {
    CompletableFuture<StockPushContent> future =  new CompletableFuture<>();
    threadUtil.postTask(() -> future.complete(getStockContent(code)));
    return future;
}

例子:使用线程池并发执行列表,并汇总结果

public List<StockPushContent> getStockContentList(List<String> codeList) {
    // 列表:并发任务Future
    List<CompletableFuture<StockPushContent>> completableFutureList = new ArrayList<>();
    for (String code : codeList) {
        completableFutureList.add(getStockContentByThreadPool(code));
    }

    try {
        // 使用CompletableFuture.allOf并发请求
        CompletableFuture<Void> allFuturesResult =
                CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[] {}));

        return allFuturesResult.thenApply(v ->
                completableFutureList.stream().
                        map(future -> future.join()).
                        collect(Collectors.toList())
        ).get(10, TimeUnit.SECONDS);
    } catch (Exception e) {
        logger.error("getStockContentList error: {}", e.getMessage(), e);
    }

    return null;
}

参考