Java CompletableFuture 异步超时实现深入研究

作者:京东云开发者 时间:2022-12-28 04:18:14 

前言

作者:京东科技 张天赐

JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future 的缺陷。

在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到 CompletableFuture 的使用。

常见使用方式

下面举例一个常见场景。

假如我们有两个 RPC 远程调用服务,我们需要获取两个 RPC 的结果后,再进行后续逻辑处理。

public static void main(String[] args) {
   // 任务 A,耗时 2 秒
   int resultA = compute(1);
   // 任务 B,耗时 2 秒
   int resultB = compute(2);
   // 后续业务逻辑处理
   System.out.println(resultA + resultB);
}

可以预估到,串行执行最少耗时 4 秒,并且 B 任务并不依赖 A 任务结果。

对于这种场景,我们通常会选择并行的方式优化,Demo 代码如下:

public static void main(String[] args) {
   // 仅简单举例,在生产代码中可别这么写!
   // 统计耗时的函数
   time(() -> {
       CompletableFuture<Integer> result = Stream.of(1, 2)
                                                 // 创建异步任务
                                                 .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                 // 聚合
                                                 .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
       // 等待结果
       try {
           System.out.println("结果:" + result.get());
       } catch (ExecutionException | InterruptedException e) {
           System.err.println("任务执行异常");
       }
   });
}
输出:
[async-1]: 任务执行开始:1
[async-2]: 任务执行开始:2
[async-1]: 任务执行完成:1
[async-2]: 任务执行完成:2
结果:3
耗时:2 秒

可以看到耗时变成了 2 秒。

存在的问题

分析

看上去 CompletableFuture 现有功能可以满足我们诉求。但当我们引入一些现实常见情况时,一些潜在的不足便暴露出来了。

compute(x) 如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但 compute(x) 耗时却在 0.5 秒 ~ 无穷 * 动。这时候我们就需要把耗时过长的 compute(x) 任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用。

那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用 get(long timeout, TimeUnit unit) 来指定获取结果的超时时间,并且我们会给 compute(x) 设置一个超时时间,达到后自动抛异常来中断任务。

public static void main(String[] args) {
   // 仅简单举例,在生产代码中可别这么写!
   // 统计耗时的函数
   time(() -> {
       List<CompletableFuture<Integer>> result = Stream.of(1, 2)
                                                       // 创建异步任务,compute(x) 超时抛出异常
                                                       .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                       .toList();
       // 等待结果
       int res = 0;
       for (CompletableFuture<Integer> future : result) {
           try {
               res += future.get(2, SECONDS);
           } catch (ExecutionException | InterruptedException | TimeoutException e) {
               System.err.println("任务执行异常或超时");
           }
       }
       System.out.println("结果:" + res);
   });
}
输出:
[async-2]: 任务执行开始:2
[async-1]: 任务执行开始:1
[async-1]: 任务执行完成:1
任务执行异常或超时
结果:1
耗时:2 秒

可以看到,只要我们能够给 compute(x) 设置一个超时时间将任务中断,结合 getgetNow 等获取结果的方式,就可以很好地管理整体耗时。

那么问题也就转变成了,如何给任务设置异步超时时间呢

现有做法

当异步任务是一个 RPC 请求时,我们可以设置一个 JSF 超时,以达到异步超时效果。

当请求是一个 R2M 请求时,我们也可以控制 R2M 连接的最大超时时间来达到效果。

这么看好像我们都是在依赖三方中间件的能力来管理任务超时时间?那么就存在一个问题,中间件超时控制能力有限,如果异步任务是中间件 IO 操作 + 本地计算操作怎么办?

用 JSF 超时举一个具体的例子,反编译 JSF 的获取结果代码如下:

public V get(long timeout, TimeUnit unit) throws InterruptedException {
   // 配置的超时时间
   timeout = unit.toMillis(timeout);
   // 剩余等待时间
   long remaintime = timeout - (this.sentTime - this.genTime);
   if (remaintime <= 0L) {
       if (this.isDone()) {
           // 反序列化获取结果
           return this.getNow();
       }
   } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
       // 等待时间内任务完成,反序列化获取结果
       return this.getNow();
   }
   this.setDoneTime();
   // 超时抛出异常
   throw this.clientTimeoutException(false);
}

当这个任务刚好卡在超时边缘完成时,这个任务的耗时时间就变成了超时时间 + 获取结果时间。而获取结果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。

某些 CPU 使用率高的情况下,就会出现异步任务没能触发抛出异常中断,导致我们无法准确控制超时时间。对上游来说,本次请求全部失败。

解决方式

JDK 9

这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现以上问题。

那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 方法,来准确实现异步超时控制。

public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
   if (unit == null)
       throw new NullPointerException();
   if (result == null)
       whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
   return this;
}

JDK 9 orTimeout 其实现原理是通过一个定时任务,在给定时间之后抛出异常。如果任务在指定时间内完成,则取消抛异常的操作。

以上代码我们按执行顺序来看下:

首先执行 new Timeout(this)

static final class Timeout implements Runnable {
   final CompletableFuture<?> f;
   Timeout(CompletableFuture<?> f) { this.f = f; }
   public void run() {
       if (f != null && !f.isDone())
           // 抛出超时异常
           f.completeExceptionally(new TimeoutException());
   }
}

通过源码可以看到,Timeout 是一个实现 Runnable 的类,run() 方法负责给传入的异步任务通过 completeExceptionally CAS 赋值异常,将任务标记为异常完成。

那么谁来触发这个 run() 方法呢?我们看下 Delayer 的实现。

static final class Delayer {
   static ScheduledFuture<?> delay(Runnable command, long delay,
                                   TimeUnit unit) {
       // 到时间触发 command 任务
       return delayer.schedule(command, delay, unit);
   }
   static final class DaemonThreadFactory implements ThreadFactory {
       public Thread newThread(Runnable r) {
           Thread t = new Thread(r);
           t.setDaemon(true);
           t.setName("CompletableFutureDelayScheduler");
           return t;
       }
   }
   static final ScheduledThreadPoolExecutor delayer;
   static {
       (delayer = new ScheduledThreadPoolExecutor(
           1, new DaemonThreadFactory())).
           setRemoveOnCancelPolicy(true);
   }
}

Delayer 其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit) 通过 ScheduledThreadPoolExecutor 实现指定时间后触发 Timeout 的 run() 方法。

到这里就已经实现了超时抛出异常的操作。但当任务完成时,就没必要触发 Timeout 了。因此我们还需要实现一个取消逻辑。

static final class Canceller implements BiConsumer<Object, Throwable> {
   final Future<?> f;
   Canceller(Future<?> f) { this.f = f; }
   public void accept(Object ignore, Throwable ex) {
       if (ex == null && f != null && !f.isDone())
       // 3 未触发抛异常任务则取消
           f.cancel(false);
   }
}

当任务执行完成,或者任务执行异常时,我们也就没必要抛出超时异常了。因此我们可以把 delayer.schedule(command, delay, unit) 返回的定时超时任务取消,不再触发 Timeout。 当我们的异步任务完成,并且定时超时任务未完成的时候,就是我们取消的时机。因此我们可以通过 whenComplete(BiConsumer<? super T, ? super Throwable> action) 来完成。

Canceller 就是一个 BiConsumer 的实现。其持有了 delayer.schedule(command, delay, unit) 返回的定时超时任务,accept(Object ignore, Throwable ex) 实现了定时超时任务未完成后,执行 cancel(boolean mayInterruptIfRunning) 取消任务的操作。

JDK 8

如果我们使用的是 JDK 9 或以上,我们可以直接用 JDK 的实现来完成异步超时操作。那么 JDK 8 怎么办呢?

其实我们也可以根据上述逻辑简单实现一个工具类来辅助。

以下是我们营销自己的工具类以及用法,贴出来给大家作为参考,大家也可以自己写的更优雅一些~

调用方式:

CompletableFutureExpandUtils.orTimeout(异步任务, 超时时间, 时间单位);

工具类源码:

package com.jd.jr.market.reduction.util;
import com.jdpay.market.common.exception.UncheckedException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* CompletableFuture 扩展工具
*
* @author zhangtianci7
*/
public class CompletableFutureExpandUtils {
   /**
    * 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。
    *
    * @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位
    * @param unit    一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间
    * @return 入参的 CompletableFuture
    */
   public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
       if (null == unit) {
           throw new UncheckedException("时间的给定粒度不能为空");
       }
       if (null == future) {
           throw new UncheckedException("异步任务不能为空");
       }
       if (future.isDone()) {
           return future;
       }
       return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
   }
   /**
    * 超时时异常完成的操作
    */
   static final class Timeout implements Runnable {
       final CompletableFuture<?> future;
       Timeout(CompletableFuture<?> future) {
           this.future = future;
       }
       public void run() {
           if (null != future && !future.isDone()) {
               future.completeExceptionally(new TimeoutException());
           }
       }
   }
   /**
    * 取消不需要的超时的操作
    */
   static final class Canceller implements BiConsumer<Object, Throwable> {
       final Future<?> future;
       Canceller(Future<?> future) {
           this.future = future;
       }
       public void accept(Object ignore, Throwable ex) {
           if (null == ex && null != future && !future.isDone()) {
               future.cancel(false);
           }
       }
   }
   /**
    * 单例延迟调度器,仅用于启动和取消任务,一个线程就足够
    */
   static final class Delayer {
       static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
           return delayer.schedule(command, delay, unit);
       }
       static final class DaemonThreadFactory implements ThreadFactory {
           public Thread newThread(Runnable r) {
               Thread t = new Thread(r);
               t.setDaemon(true);
               t.setName("CompletableFutureExpandUtilsDelayScheduler");
               return t;
           }
       }
       static final ScheduledThreadPoolExecutor delayer;
       static {
           delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
           delayer.setRemoveOnCancelPolicy(true);
       }
   }
}

参考资料 JEP 266: JDK 9 并发包更新提案

来源:https://juejin.cn/post/7197606993858281530

标签:Java,CompletableFuture,异步超时
0
投稿

猜你喜欢

  • 详解idea maven nexus 常见命令配置

    2021-06-07 18:29:03
  • java获取中文拼音首字母工具类定义与用法实例

    2023-07-14 08:23:55
  • 详解Java如何判断ResultSet结果集是否为空

    2023-08-22 10:51:14
  • SpringDataMongoDB多文档事务的实现

    2023-11-25 06:37:55
  • java中的HashMap多层嵌套

    2023-11-27 07:34:52
  • EventBus与Spring Event区别详解(EventBus 事件机制,Spring Event事件机制)

    2023-12-06 00:23:05
  • 教你安装配置Android Studio

    2023-07-09 03:26:17
  • 关于Future机制原理及解析

    2022-01-23 03:57:48
  • Java结构型设计模式之装饰模式详解

    2022-07-13 10:00:35
  • spring boot实现自动输出word文档功能的实例代码

    2021-11-10 13:37:51
  • idea 实现纵列选择和大小写转换操作

    2023-07-16 20:25:14
  • Kotlin原理详析之拓展函数

    2023-04-10 19:54:08
  • 使用maven运行Java Main的三种方法解析

    2021-09-24 10:09:28
  • 如何在IDEA中对 hashCode()和 equals() 利用快捷键快速进行方法重写

    2021-11-04 19:06:13
  • Java四种常用线程池的详细介绍

    2021-09-29 17:45:46
  • 使用反射方式获取JPA Entity的属性和值

    2023-07-24 17:43:22
  • Java设计模式之迭代模式(Iterator模式)介绍

    2022-07-24 16:03:29
  • Runtime.getRuntime().exec 路径包含空格的解决

    2023-05-15 23:54:32
  • Java开发之Lombok指南

    2022-11-19 21:49:28
  • Spring Boot提高开发效率必备工具lombok使用

    2022-05-21 08:22:38
  • asp之家 软件编程 m.aspxhome.com