因收到Google相关通知,网站将会择期关闭。相关通知内容

09 结果如何?线程的秘密告白

我们在前几节中学习了线程如何组织以及如何保证线程安全,但是我们介绍的线程的使用方式几乎全部是 Runnable 接口,虽然我们也稍微讲了一下 Callable 的使用方式。

那么,本章节将重点讲解 Callable 接口的详细用法。

一、Callable 接口

存在即合理,我们之前所学的 Runnable 接口存在以下两个缺陷:

  1. Runnable 接口不能返回返回值;
  2. Runnable 接口不允许抛出一个异常。

以上的两个缺陷导致于 Runnable 接口在一些特定的开发场景中,实现某一些特定功能很麻烦。比如,我现在有 100w 的数据,需要你采用线程池将 100w 的数据拆分为 10 个线程执行,当其中一个线程出问题后,需要将错误信息,以及出错的区间返回!

如果使用 Runnable 接口来实现就会比较麻烦,需要借助我们之前讲的 CountDownLatch 等类似的工具来进行计数实现,而且 Runnable 还无法返回出错的信息和区间。但如果采用本节课即将讲到的 Callable 接口来实现这个功能,整体就会简单很多。

我们先学习一下它的基础使用方式:

public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> stringFutureTask = new FutureTask<>(new Task());
        new Thread(stringFutureTask).start();
        //获取线程的返回结果
        System.out.println(stringFutureTask.get());
    }

    private static class Task implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "我是执行结果";
        }
    }
}

从上述的代码可以看到,我们将 Callable 包装成了一个 FutureTask,后续对于 Callable 的操作主要集中在 FutureTask 上。

Callable 获取结果的时候会抛出两个异常,ExecutionException、InterruptedException。其中 InterruptedException 是当线程被中断或者任务被取消的时候抛出的异常,当任务抛出异常的时候会触发 ExecutionException,当任务被取消的时候,会出现一个 TimeoutException。

接下来,我们将针对 FutureTask 的主要常用的 API 做一个详细的介绍。

  1. boolean cancel(boolean mayInterruptIfRunning):
    • 作用是用于取消与 Future 关联的计算任务。
    • 参数 mayInterruptIfRunning 用于确定是否中断正在执行任务的线程。
    • 如果任务已经完成或已经被取消,或者由于其他原因不能被取消,则此方法返回 false ;否则,任务将被取消,并返回 true
  2. boolean isCancelled():
    • 作用是用于检查与此 Future 关联的计算任务是否已被取消。
    • 如果任务已经被取消,则返回 true ;否则,返回 false
  3. boolean isDone():
    • 作用是用于检查与此 Future 关联的计算任务是否已经完成。
    • 如果任务已经完成(包括正常完成、取消或由于异常而完成),则返回 true
  4. V get():
    • 作用是用于获取与此 Future 关联的计算结果。
    • 如果计算尚未完成,则此方法将阻塞当前线程,直到计算完成为止。
    • 如果计算已经完成,它会立即返回结果。
    • 如果计算抛出异常,此方法也会抛出相应的异常。
  5. V get(long timeout, TimeUnit unit):
    • 作用是用于获取与此 Future 关联的计算结果,但是在指定的时间内如果计算尚未完成,则抛出 TimeoutException 异常。
    • 参数 timeout 表示超时时间, unit 表示时间单位。

注意,这里的 Future 任务虽然提供了取消任务的能力,但是当任务没有处于阻塞状态的时候,实际上任务并不会停止,它只能取消能够响应中断任务的任务。加入类似的任务是一个死循环,此时程序无法被停止。

下面我们学习正确停止死循环的两个方式(重点请关注一个注意点,响应中断任务)。

  1. 使用判断线程存活的方式来验证是否需要继续执行:
public class StopTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
        new Thread(stringFutureTask).start();
        //获取线程的返回结果
        Thread.sleep(1000);
        System.out.println(stringFutureTask.cancel(true));
        System.out.println("任务被停止");
        System.out.println(stringFutureTask.get());
    }

    private static class Task implements Callable<String> {

        @Override
        public String call() throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("线程正在运行");
            }
            return "运行完成";
        }
    }
}
  1. 采用睡眠中断的形式来响应取消的指令:
public class StopTest2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
        new Thread(stringFutureTask).start();
        //获取线程的返回结果
        Thread.sleep(1000);
        System.out.println(stringFutureTask.cancel(true));
        System.out.println("任务被停止");
        System.out.println(stringFutureTask.get());
    }

    private static class Task implements Callable<String> {

        @Override
        public String call() throws Exception {
            while (true) {
                System.out.println("线程正在运行");
                Thread.sleep(500);
            }
        }
    }
}

与第一种案例不同的是,这里使用了 sleep 来阻塞程序,当发起取消任务申请的时候,Task 会抛出中断异常,从而会从 call 方法的循环中跳出,并结束程序。当任务被取消成功后,调用 get 方法获取结果会抛出异常 CancellationException!

二、线程池使用 Callable

后面我们学习如何配合线程池来使用 Callable 接口,线程池使用 Callable 与直接使用类似,基础使用如下:

public class ThreadPoolCallable {
    private final static AtomicInteger IDX = new AtomicInteger(0);

    private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 3, 5, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "test-" + IDX.getAndIncrement());
        }
    }, new ThreadPoolExecutor.AbortPolicy());


    public static void main(String[] args) throws Exception {
        Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
        System.out.println(submit.get());

    }

    private static class Task implements Callable<String> {

        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(2000);
            }catch (Exception e) {
                e.printStackTrace();
            }
            return "我是返回结果";
        }
    }
}

与我们之前使用线程池提交任务不同的是,这里使用的是 submit 来提交任务,提交任务完成后返回一个 Future ,内部的 API 与上文同理,这里不做太多的解释。

接下来,我们将针对 Future 来设计几个使用案例来加深你的印象。

三、案例

1. 超时案例

我们有一个系统,需要调用第三方的接口获取数据,但是因为我们系统的用户体验要求,如果 3 秒内接口没有返回,就返回一个第三方接口网络异常;如果 3 秒内返回了,就返回第三方数据访问成功。

public class ThreadPoolCallableCase1 {
    private final static AtomicInteger IDX = new AtomicInteger(0);

    private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());


    public static void main(String[] args) {

        ThreadPoolCallableCase1 threadPoolCallableCase1 = new ThreadPoolCallableCase1();
        System.out.println(threadPoolCallableCase1.getData());
    }

    public String getData(){
        Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
        try {
            return submit.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return "手动中断任务";
        } catch (ExecutionException e) {
            return "第三方异常";
        } catch (TimeoutException e) {
            //超时了就取消任务
            System.out.println(submit.cancel(true));
            return "第三方接口网络超时";
        }
    }

    private static class Task implements Callable<String> {

        @Override
        public String call() throws Exception {
            try {
                Thread.sleep((long) (Math.random() * 7000));
            }catch (Exception e) {
                System.out.println("任务被主动中断");
            }
            return "第三方数据返回成功";
        }
    }
}

这里可以看到,我们使用了带有等待时间的 get 方法来获取数据,当在规定时间内还没有返回数据的时候,此时就会抛出第三方接口网络超时的异常信息。

2. 并行计算下的结果获取

假设我们存在 10w 的数据,现在要将其分为 10 个线程处理,每一个线程处理 1w 的数据写入数据库,当数据全部写入成功后,返回写入成功;当数据某一批写入失败,需要返回哪一个区间写入失败。

public class ThreadDbTest {
    private final static AtomicInteger IDX = new AtomicInteger(0);

    private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());


    public static void main(String[] args) {

        List<Future<String>> futures = new ArrayList<>();

        Future<String> submit1 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(1, 10000, true));
        Future<String> submit2 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(10001, 20000, true));
        Future<String> submit3 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(20001, 30000, true));
        Future<String> submit4 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(30001, 40000, true));
        Future<String> submit5 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(40001, 50000, false));
        Future<String> submit6 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(50001, 60000, true));
        Future<String> submit7 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(70001, 80000, true));
        Future<String> submit8 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(80001, 90000, true));
        Future<String> submit9 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(90001, 100000, true));

        futures.add(submit1);
        futures.add(submit2);
        futures.add(submit3);
        futures.add(submit4);
        futures.add(submit5);
        futures.add(submit6);
        futures.add(submit7);
        futures.add(submit8);
        futures.add(submit9);

        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

    }

    public static class BatchWriteDbTask implements Callable<String> {


        private final Integer minIndex;
        private final Integer maxIndex;

        /**
         * 模拟使用, 当为true的时候就写入成功 当为false就写入失败
         */
        private final boolean isSuccess;

        public BatchWriteDbTask(Integer minIndex, Integer maxIndex, boolean isSuccess) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;
            this.isSuccess = isSuccess;
        }


        @Override
        public String call() throws Exception {
            System.out.println("开始批量写入数据 " + minIndex + "至" + maxIndex);
            if(!isSuccess) {
                throw new Exception("数据 " + minIndex + "至" + maxIndex + "写入失败,请手动处理。");
            }
            Thread.sleep(5000);
            return "数据"  + minIndex  + "至" + maxIndex + "写入成功";
        }
    }
}

我们查看最终的运行结果:

开始批量写入数据 1至10000
开始批量写入数据 20001至30000
开始批量写入数据 10001至20000
开始批量写入数据 30001至40000
开始批量写入数据 40001至50000
开始批量写入数据 50001至60000
开始批量写入数据 70001至80000
开始批量写入数据 80001至90000
开始批量写入数据 90001至100000
数据1至10000写入成功
数据10001至20000写入成功
数据20001至30000写入成功
数据30001至40000写入成功
数据50001至60000写入成功
数据70001至80000写入成功
数据80001至90000写入成功
数据90001至100000写入成功
java.util.concurrent.ExecutionException: java.lang.Exception: 数据 40001至50000写入失败,请手动处理。
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.eight.ThreadDbTest.main(ThreadDbTest.java:44)
Caused by: java.lang.Exception: 数据 40001至50000写入失败,请手动处理。
	at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:76)
	at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:54)

从最终的运行结果中可以看到,我们程序中预设的 “40001 至 50000 写入失败”是被成功捕获异常并返回的。

因为是异步执行,所以程序返回 Future 的时候可能程序并未开始执行或者正在执行中,为了获取最终的计算结果,程序的整体我们使用了一个集合来存储 Future 结果集,然后任务全部提交后遍历这个集合,使用 get 方法来获取真正的执行结果。当任务执行完毕后,get 方法会停止阻塞返回运行结果;当程序运行出错的时候,此时 get 方法会抛出最终的异常信息以供检测使用。

四、总结

本节课我们充分介绍了如何使用 Future 来进行获取线程的数据结果,包括对于 API 的介绍以及使用,我们还使用了几个例子使你加深印象,在异步计算中 Future 能够大大加快计算速度。Dubbo 就是使用 Future 来异步获取 API 的结果以及控制超时等能力的。