关于Java 中 Future 的 get 方法超时问题

作者:time-flies 时间:2022-09-27 07:58:18 

一、背景

很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,ThreadPoolExecutor 构造方法的一些参数和执行过程等。

工作中,很多人会使用线程池的 submit 方法 获取 Future 类型的返回值,然后使用 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 实现“最多等多久”的效果。

但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。

比如,java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现?

如果你对这个问题没有很大的把握,说明你掌握的还不够扎实。

最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?

二、模拟

2.1 常见写法

下面给出一个简单的模拟案例:

package basic.thread;
import java.util.concurrent.*;
public class FutureDemo {
   public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       Future<?> future = executorService.submit(() -> {
           try {
               demo();
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           }
       });
       String threadName = Thread.currentThread().getName();
       System.out.println(threadName + "获取的结果 -- start");
       Object result = future.get(100, TimeUnit.MILLISECONDS);
       System.out.println(threadName + "获取的结果 -- end :" + result);
   }
   private static String demo() throws InterruptedException {
       String threadName = Thread.currentThread().getName();
       System.out.println(threadName + ",执行 demo -- start");
       TimeUnit.SECONDS.sleep(1);
       System.out.println(threadName + ",执行 demo -- end");
       return "test";
   }
}

输出结果:

main获取的结果 -- start
pool-1-thread-1,执行 demo -- start
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at basic.thread.FutureDemo.main(FutureDemo.java:20)
pool-1-thread-1,执行 demo -- end

我们可以发现:当前线程会因为收到 TimeoutException 而被中断,线程池里对应的线程&ldquo;却&rdquo;继续执行完毕。

2.2 尝试取消

我们尝试对未完成的线程进行取消,发现 Future#cancel 有个 boolean 类型的参数。

/**
    * Attempts to cancel execution of this task.  This attempt will
    * fail if the task has already completed, has already been cancelled,
    * or could not be cancelled for some other reason. If successful,
    * and this task has not started when {@code cancel} is called,
    * this task should never run.  If the task has already started,
    * then the {@code mayInterruptIfRunning} parameter determines
    * whether the thread executing this task should be interrupted in
    * an attempt to stop the task.
    *
    * <p>After this method returns, subsequent calls to {@link #isDone} will
    * always return {@code true}.  Subsequent calls to {@link #isCancelled}
    * will always return {@code true} if this method returned {@code true}.
    *
    * @param mayInterruptIfRunning {@code true} if the thread executing this
    * task should be interrupted; otherwise, in-progress tasks are allowed
    * to complete
    * @return {@code false} if the task could not be cancelled,
    * typically because it has already completed normally;
    * {@code true} otherwise
    */
   boolean cancel(boolean mayInterruptIfRunning);

看源码注释我们可以知道:

当设置为 true 时,正在执行的任务将被中断(interrupted);

当设置为 false 时,如果任务正在执行中,那么仍然允许任务执行完成。

2.2.1 cancel(false)

此时,为了不让主线程因为超时异常被中断,我们 try-catch 包起来。

package basic.thread;
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
   public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       Future<?> future = executorService.submit(() -> {
           try {
               demo();
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           }
       });
       String threadName = Thread.currentThread().getName();
       System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
       try {
           Object result = future.get(100, TimeUnit.MILLISECONDS);
           System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
       } catch (Exception e) {
           System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
       }
       future.cancel(false);
       System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
   }
   private static String demo() throws InterruptedException {
       String threadName = Thread.currentThread().getName();
       System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
       TimeUnit.SECONDS.sleep(1);
       System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
       return "test";
   }
}

结果:

1653751759233,main获取的结果 -- start
1653751759233,pool-1-thread-1,执行 demo -- start
1653751759343,main获取的结果异常:java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at basic.thread.FutureDemo.main(FutureDemo.java:23)

1653751759351,main获取的结果 -- cancel
1653751760263,pool-1-thread-1,执行 demo -- end

我们发现,线程池里的对应线程在 cancel(false) 时,如果已经正在执行,则会继续执行完成。

2.2.2 cancel(true)

package basic.thread;
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
   public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       Future<?> future = executorService.submit(() -> {
           try {
               demo();
           } catch (InterruptedException e) {
               System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e));
               throw new RuntimeException(e);
           }
       });
       String threadName = Thread.currentThread().getName();
       System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
       try {
           Object result = future.get(100, TimeUnit.MILLISECONDS);
           System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
       } catch (Exception e) {
           System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
       }
       future.cancel(true);
       System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
   }
   private static String demo() throws InterruptedException {
       String threadName = Thread.currentThread().getName();
       System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
       TimeUnit.SECONDS.sleep(1);
       System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
       return "test";
   }
}

执行结果:

1653752011246,main获取的结果 -- start
1653752011246,pool-1-thread-1,执行 demo -- start
1653752011347,main获取的结果异常:java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at basic.thread.FutureDemo.main(FutureDemo.java:24)

1653752011363,pool-1-thread-1, Interrupted:java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at basic.thread.FutureDemo.demo(FutureDemo.java:36)
    at basic.thread.FutureDemo.lambda$main$0(FutureDemo.java:14)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1653752011366,main获取的结果 -- cancel

可以看出,此时,如果目标线程未执行完,那么会收到 InterruptedException ,被中断。

当然,如果此时不希望目标线程被中断,可以使用 try-catch 包住,再执行其他逻辑。

package basic.thread;
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
   public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       Future<?> future = executorService.submit(() -> {
           demo();
       });
       String threadName = Thread.currentThread().getName();
       System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
       try {
           Object result = future.get(100, TimeUnit.MILLISECONDS);
           System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
       } catch (Exception e) {
           System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
       }
       future.cancel(true);
       System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
   }
   private static String demo() {
       String threadName = Thread.currentThread().getName();
       System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
       try {
           TimeUnit.SECONDS.sleep(1);
       } catch (InterruptedException e) {
           System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo 被中断,自动降级");
       }
       System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
       return "test";
   }
}

执行结果:

1653752219803,main获取的结果 -- start
1653752219803,pool-1-thread-1,执行 demo -- start
1653752219908,main获取的结果异常:java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at basic.thread.FutureDemo.main(FutureDemo.java:19)

1653752219913,main获取的结果 -- cancel
1653752219914,pool-1-thread-1,执行 demo 被中断,自动降级
1653752219914,pool-1-thread-1,执行 demo -- end

三、回归源码

我们直接看 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 的源码注释,就可以清楚地知道各种情况的表现:

/**
    * Waits if necessary for at most the given time for the computation
    * to complete, and then retrieves its result, if available.
    *
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @return the computed result
    * @throws CancellationException if the computation was cancelled
    * @throws ExecutionException if the computation threw an
    * exception
    * @throws InterruptedException if the current thread was interrupted
    * while waiting
    * @throws TimeoutException if the wait timed out
    */
   V get(long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;

我们还可以选取几个常见的实现类,查看下实现的基本思路:

java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit)

public V get(long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException {
       if (unit == null)
           throw new NullPointerException();
       int s = state;
       if (s <= COMPLETING &&
           (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
           throw new TimeoutException();
       return report(s);
   }

java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)

/**
    * Waits if necessary for at most the given time for this future
    * to complete, and then returns its result, if available.
    *
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @return the result value
    * @throws CancellationException if this future was cancelled
    * @throws ExecutionException if this future completed exceptionally
    * @throws InterruptedException if the current thread was interrupted
    * while waiting
    * @throws TimeoutException if the wait timed out
    */
   public T get(long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException {
       Object r;
       long nanos = unit.toNanos(timeout);
       return reportGet((r = result) == null ? timedGet(nanos) : r);
   }
/**
    * Returns raw result after waiting, or null if interrupted, or
    * throws TimeoutException on timeout.
    */
   private Object timedGet(long nanos) throws TimeoutException {
       if (Thread.interrupted())
           return null;
       if (nanos <= 0L)
           throw new TimeoutException();
       long d = System.nanoTime() + nanos;
       Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
       boolean queued = false;
       Object r;
       // We intentionally don't spin here (as waitingGet does) because
       // the call to nanoTime() above acts much like a spin.
       while ((r = result) == null) {
           if (!queued)
               queued = tryPushStack(q);
           else if (q.interruptControl < 0 || q.nanos <= 0L) {
               q.thread = null;
               cleanStack();
               if (q.interruptControl < 0)
                   return null;
               throw new TimeoutException();
           }
           else if (q.thread != null && result == null) {
               try {
                   ForkJoinPool.managedBlock(q);
               } catch (InterruptedException ie) {
                   q.interruptControl = -1;
               }
           }
       }
       if (q.interruptControl < 0)
           r = null;
       q.thread = null;
       postComplete();
       return r;
   }

java.util.concurrent.Future#cancel 也一样

/**
* Attempts to cancel execution of this task.  This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run.  If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}.  Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

java.util.concurrent.FutureTask#cancel

public boolean cancel(boolean mayInterruptIfRunning) {
       if (!(state == NEW &&
             UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
           return false;
       try {    // in case call to interrupt throws exception
           if (mayInterruptIfRunning) {
               try {
                   Thread t = runner;
                   if (t != null)
                       t.interrupt();
               } finally { // final state
                   UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
               }
           }
       } finally {
           finishCompletion();
       }
       return true;
   }

可以看到 mayInterruptIfRunning 为 true 时,会执行 Thread#interrupt 方法

java.util.concurrent.CompletableFuture#cancel

/**
    * If not already completed, completes this CompletableFuture with
    * a {@link CancellationException}. Dependent CompletableFutures
    * that have not already completed will also complete
    * exceptionally, with a {@link CompletionException} caused by
    * this {@code CancellationException}.
    *
    * @param mayInterruptIfRunning this value has no effect in this
    * implementation because interrupts are not used to control
    * processing.
    *
    * @return {@code true} if this task is now cancelled
    */
   public boolean cancel(boolean mayInterruptIfRunning) {
       boolean cancelled = (result == null) &&
           internalComplete(new AltResult(new CancellationException()));
       postComplete();
       return cancelled || isCancelled();
   }

通过注释我们也发现,不同的实现类对参数的&ldquo;效果&rdquo;也有差异。

四、总结

我们学习时不应该想当然,不能纸上谈兵,对于不太理解的地方,可以多看源码注释,多看源码,多写 DEMO 去模拟或调试。

来源:https://www.cnblogs.com/timefiles/p/CsvReadWrite.html

标签:Java,Future,get,超时
0
投稿

猜你喜欢

  • 详解SpringBoot构建的Web项目如何在服务端校验表单输入

    2021-08-22 23:51:59
  • java 如何远程控制tomcat启动关机

    2023-04-10 03:21:47
  • 使用Java编写一个简单的Web的监控系统

    2023-02-18 04:00:19
  • java中的控制结构(if,循环)详解

    2022-05-13 19:18:59
  • 亲手教你SpringBoot中的多数据源集成问题

    2023-08-19 02:57:20
  • Spring源码解密之自定义标签与解析

    2023-11-25 01:11:34
  • Java规则引擎Easy Rules的使用介绍

    2023-05-04 01:42:57
  • Android ListView 和ScroolView 出现onmeasure空指针的解决办法

    2021-11-25 18:19:03
  • java中使用zxing批量生成二维码立牌

    2021-12-31 04:31:03
  • idea的spring boot项目实现更改端口号操作

    2023-11-23 03:21:17
  • Java经典面试题最全汇总208道(二)

    2023-11-09 08:13:39
  • Mybatis全局配置及映射关系的实现

    2022-07-03 09:31:39
  • Spring Bean生命周期之BeanDefinition的合并过程详解

    2023-11-29 02:50:35
  • Java JVM中线程状态详解

    2023-01-24 16:06:19
  • Java动态线程池插件dynamic-tp集成zookeeper

    2023-11-25 03:41:38
  • C#实现通过winmm.dll控制声音播放的方法

    2022-12-02 06:35:17
  • Java线程中的常见方法(start方法和run方法)

    2023-11-16 17:41:32
  • C#正则表达式分解和转换IP地址实例(C#正则表达式大全 c#正则表达式语法)

    2023-07-17 07:11:25
  • java源码解析之String类的compareTo(String otherString)方法

    2023-11-11 23:10:00
  • Java Swing JTextArea文本区域的实现示例

    2023-10-30 13:40:28
  • asp之家 软件编程 m.aspxhome.com