我们在前几节中学习了线程如何组织以及如何保证线程安全,但是我们介绍的线程的使用方式几乎全部是 Runnable
接口,虽然我们也稍微讲了一下 Callable
的使用方式。
那么,本章节将重点讲解 Callable 接口的详细用法。
存在即合理,我们之前所学的 Runnable 接口存在以下两个缺陷:
以上的两个缺陷导致于 Runnable 接口在一些特定的开发场景中,实现某一些特定功能很麻烦。比如,我现在有 100w 的数据,需要你采用线程池将 100w 的数据拆分为 10 个线程执行,当其中一个线程出问题后,需要将错误信息,以及出错的区间返回!
如果使用 Runnable 接口来实现就会比较麻烦,需要借助我们之前讲的 CountDownLatch
等类似的工具来进行计数实现,而且 Runnable 还无法返回出错的信息和区间。但如果采用本节课即将讲到的 Callable 接口来实现这个功能,整体就会简单很多。
我们先学习一下它的基础使用方式:
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> stringFutureTask = new FutureTask<>(new Task());
new Thread(stringFutureTask).start();
//获取线程的返回结果
System.out.println(stringFutureTask.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
return "我是执行结果";
}
}
}
从上述的代码可以看到,我们将 Callable 包装成了一个 FutureTask,后续对于 Callable 的操作主要集中在 FutureTask 上。
Callable 获取结果的时候会抛出两个异常,ExecutionException、InterruptedException。其中 InterruptedException 是当线程被中断或者任务被取消的时候抛出的异常,当任务抛出异常的时候会触发 ExecutionException,当任务被取消的时候,会出现一个 TimeoutException。
接下来,我们将针对 FutureTask 的主要常用的 API 做一个详细的介绍。
Future
关联的计算任务。mayInterruptIfRunning
用于确定是否中断正在执行任务的线程。false
;否则,任务将被取消,并返回 true
。Future
关联的计算任务是否已被取消。true
;否则,返回 false
。Future
关联的计算任务是否已经完成。true
。Future
关联的计算结果。Future
关联的计算结果,但是在指定的时间内如果计算尚未完成,则抛出 TimeoutException
异常。timeout
表示超时时间, unit
表示时间单位。注意,这里的 Future
任务虽然提供了取消任务的能力,但是当任务没有处于阻塞状态的时候,实际上任务并不会停止,它只能取消能够响应中断任务的任务。加入类似的任务是一个死循环,此时程序无法被停止。
下面我们学习正确停止死循环的两个方式(重点请关注一个注意点,响应中断任务)。
public class StopTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
new Thread(stringFutureTask).start();
//获取线程的返回结果
Thread.sleep(1000);
System.out.println(stringFutureTask.cancel(true));
System.out.println("任务被停止");
System.out.println(stringFutureTask.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("线程正在运行");
}
return "运行完成";
}
}
}
public class StopTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> stringFutureTask = new FutureTask<String>(new Task());
new Thread(stringFutureTask).start();
//获取线程的返回结果
Thread.sleep(1000);
System.out.println(stringFutureTask.cancel(true));
System.out.println("任务被停止");
System.out.println(stringFutureTask.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
while (true) {
System.out.println("线程正在运行");
Thread.sleep(500);
}
}
}
}
与第一种案例不同的是,这里使用了 sleep 来阻塞程序,当发起取消任务申请的时候,Task 会抛出中断异常,从而会从 call 方法的循环中跳出,并结束程序。当任务被取消成功后,调用 get 方法获取结果会抛出异常 CancellationException!
后面我们学习如何配合线程池来使用 Callable 接口,线程池使用 Callable 与直接使用类似,基础使用如下:
public class ThreadPoolCallable {
private final static AtomicInteger IDX = new AtomicInteger(0);
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 3, 5, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "test-" + IDX.getAndIncrement());
}
}, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws Exception {
Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
System.out.println(submit.get());
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
try {
Thread.sleep(2000);
}catch (Exception e) {
e.printStackTrace();
}
return "我是返回结果";
}
}
}
与我们之前使用线程池提交任务不同的是,这里使用的是 submit 来提交任务,提交任务完成后返回一个 Future
,内部的 API 与上文同理,这里不做太多的解释。
接下来,我们将针对 Future
来设计几个使用案例来加深你的印象。
我们有一个系统,需要调用第三方的接口获取数据,但是因为我们系统的用户体验要求,如果 3 秒内接口没有返回,就返回一个第三方接口网络异常;如果 3 秒内返回了,就返回第三方数据访问成功。
public class ThreadPoolCallableCase1 {
private final static AtomicInteger IDX = new AtomicInteger(0);
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
ThreadPoolCallableCase1 threadPoolCallableCase1 = new ThreadPoolCallableCase1();
System.out.println(threadPoolCallableCase1.getData());
}
public String getData(){
Future<String> submit = THREAD_POOL_EXECUTOR.submit(new Task());
try {
return submit.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return "手动中断任务";
} catch (ExecutionException e) {
return "第三方异常";
} catch (TimeoutException e) {
//超时了就取消任务
System.out.println(submit.cancel(true));
return "第三方接口网络超时";
}
}
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
try {
Thread.sleep((long) (Math.random() * 7000));
}catch (Exception e) {
System.out.println("任务被主动中断");
}
return "第三方数据返回成功";
}
}
}
这里可以看到,我们使用了带有等待时间的 get 方法来获取数据,当在规定时间内还没有返回数据的时候,此时就会抛出第三方接口网络超时的异常信息。
假设我们存在 10w 的数据,现在要将其分为 10 个线程处理,每一个线程处理 1w 的数据写入数据库,当数据全部写入成功后,返回写入成功;当数据某一批写入失败,需要返回哪一个区间写入失败。
public class ThreadDbTest {
private final static AtomicInteger IDX = new AtomicInteger(0);
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), r -> new Thread(r, "open-api-" + IDX.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
List<Future<String>> futures = new ArrayList<>();
Future<String> submit1 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(1, 10000, true));
Future<String> submit2 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(10001, 20000, true));
Future<String> submit3 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(20001, 30000, true));
Future<String> submit4 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(30001, 40000, true));
Future<String> submit5 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(40001, 50000, false));
Future<String> submit6 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(50001, 60000, true));
Future<String> submit7 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(70001, 80000, true));
Future<String> submit8 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(80001, 90000, true));
Future<String> submit9 = THREAD_POOL_EXECUTOR.submit(new BatchWriteDbTask(90001, 100000, true));
futures.add(submit1);
futures.add(submit2);
futures.add(submit3);
futures.add(submit4);
futures.add(submit5);
futures.add(submit6);
futures.add(submit7);
futures.add(submit8);
futures.add(submit9);
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static class BatchWriteDbTask implements Callable<String> {
private final Integer minIndex;
private final Integer maxIndex;
/**
* 模拟使用, 当为true的时候就写入成功 当为false就写入失败
*/
private final boolean isSuccess;
public BatchWriteDbTask(Integer minIndex, Integer maxIndex, boolean isSuccess) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
this.isSuccess = isSuccess;
}
@Override
public String call() throws Exception {
System.out.println("开始批量写入数据 " + minIndex + "至" + maxIndex);
if(!isSuccess) {
throw new Exception("数据 " + minIndex + "至" + maxIndex + "写入失败,请手动处理。");
}
Thread.sleep(5000);
return "数据" + minIndex + "至" + maxIndex + "写入成功";
}
}
}
我们查看最终的运行结果:
开始批量写入数据 1至10000
开始批量写入数据 20001至30000
开始批量写入数据 10001至20000
开始批量写入数据 30001至40000
开始批量写入数据 40001至50000
开始批量写入数据 50001至60000
开始批量写入数据 70001至80000
开始批量写入数据 80001至90000
开始批量写入数据 90001至100000
数据1至10000写入成功
数据10001至20000写入成功
数据20001至30000写入成功
数据30001至40000写入成功
数据50001至60000写入成功
数据70001至80000写入成功
数据80001至90000写入成功
数据90001至100000写入成功
java.util.concurrent.ExecutionException: java.lang.Exception: 数据 40001至50000写入失败,请手动处理。
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.eight.ThreadDbTest.main(ThreadDbTest.java:44)
Caused by: java.lang.Exception: 数据 40001至50000写入失败,请手动处理。
at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:76)
at com.eight.ThreadDbTest$BatchWriteDbTask.call(ThreadDbTest.java:54)
从最终的运行结果中可以看到,我们程序中预设的 “40001 至 50000 写入失败”是被成功捕获异常并返回的。
因为是异步执行,所以程序返回 Future 的时候可能程序并未开始执行或者正在执行中,为了获取最终的计算结果,程序的整体我们使用了一个集合来存储 Future 结果集,然后任务全部提交后遍历这个集合,使用 get 方法来获取真正的执行结果。当任务执行完毕后,get 方法会停止阻塞返回运行结果;当程序运行出错的时候,此时 get 方法会抛出最终的异常信息以供检测使用。
本节课我们充分介绍了如何使用 Future
来进行获取线程的数据结果,包括对于 API 的介绍以及使用,我们还使用了几个例子使你加深印象,在异步计算中 Future
能够大大加快计算速度。Dubbo 就是使用 Future
来异步获取 API 的结果以及控制超时等能力的。
© 2019 - 2023 Liangliang Lee. Powered by gin and hexo-theme-book.