0%

JAVA基础知识-异步回调模式

异步回调模式。

JOIN异步阻塞

操作原理:阻塞当前的线程,直到准备合并的目标线程的执行完成;即线程A调用了线程B的join方法,合并线程B,线程A则进入阻塞状态,直到线程B执行完成。

Future异步回调模式_泡茶案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.leezy.future;

/**
* @program: NIOStudy
* @description: 异步阻塞JOIN
* @author: LEEZY
* @create: 2020-03-18 15:28
**/

public class JoinDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}

static class HotWaterThread extends Thread {
public HotWaterThread() {
super("烧水线程");
}

public void run() {
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class WashThread extends Thread {
public WashThread() {
super("清洗线程");
}
public void run() {
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
Thread hotWaterThread = new HotWaterThread();
Thread washThread = new WashThread();
hotWaterThread.start();
washThread.start();
try {
// 主线程阻塞,开启烧水和清洗线程
// 合并烧水线程
hotWaterThread.join();
// 合并清洗线程
washThread.join();
Thread.currentThread().setName("主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

join方法是有三个重载版本:

  • void join():A线程等待B线程执行结束后,A线程重新恢复执行。
  • void join(long millis):A线程等待B线程执行一段时间,最长等待时间为millis毫秒。超过millis毫秒后,不论B线程是否结束,A线程重新恢复执行。
  • void join(long millis, int nanos):等待B线程执行一段时间,最长等待时间为millis毫秒,加nanos纳秒。超过时间后,不论B线程是否结束,A线程重新恢复执行。

JOIN被合并的线程没有返回值,如果需要异步线程的执行结果,就需要用到Java的FutureTask系列类。

FutureTask异步回调

  • Callable接口:Callable接口是个泛型接口,与Runnable接口类似,唯一的区别是,其抽象方法call有返回值,返回值的类型为泛型形参的实际类型。但是Callable接口的实例不能作为Thread线程实例的target来使用,而Runnable接口实例可以作为Thread线程实例的target构造参数,开启一个Thread线程。其内部进行的是异步执行的逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    package java.util.concurrent;

    @FunctionalInterface
    public interface Callable<V> {
    /**
    * Computes a result, or throws an exception if unable to do so.
    *
    * @return computed result
    * @throws Exception if unable to compute a result
    */
    V call() throws Exception;
    }
  • FutureTask类:就像一座搭在Callable实例与Thread线程实例之间的桥。FutureTask类的内部封装一个Callable实例,然后自身间接继承了Runnable接口可以作为Thread线程的target。

1
2
3
4
5
6
7
8
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.leezy.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
* @program: NIOStudy
* @description: FutureTask类实现喝茶实例
* @author: LEEZY
* @create: 2020-03-19 11:40
**/

public class JavaFutureDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}

// 实现Callable接口,并返回异步线程执行结果
static class HotWaterJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
return false;
}
return true;
}
}

static class WashJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
return false;
}
return true;
}
}

public static void drinkTea(boolean waterOK, boolean teacupOK) {
if (waterOK && teacupOK) {
System.out.println("喝茶");
} else if (!waterOK) {
System.out.println("烧水失败");
} else {
System.out.println("洗杯子失败");
}
}

public static void main(String[] args) {
// 异步逻辑
Callable<Boolean> hotWaterJob = new HotWaterJob();
// 创建FutureTask实例,创建新的线程
FutureTask<Boolean> hotWaterTask = new FutureTask<>(hotWaterJob);
Thread hotWaterThread = new Thread(hotWaterTask, "烧水线程");

Callable<Boolean> washJob = new WashJob();
FutureTask<Boolean> washTask = new FutureTask<>(washJob);
Thread washThread = new Thread(washTask, "清洁线程");

hotWaterThread.start();
washThread.start();

Thread.currentThread().setName("主线程");

try {
Boolean waterOK = hotWaterTask.get();
Boolean teacupOK = washTask.get();
drinkTea(waterOK, teacupOK);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

P.S.FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。所以,在使用的时候,它们两个实例的泛型参数一定需要保持一致

  • Future接口
    Java将FutureTask类的一系列操作,抽象出来作为一个重要的接口,Future接口。主要提供了三个功能
  1. 判断并发任务是否执行完成
  2. 获取并发的任务完成后的结果
  3. 取消并发执行中的任务
1
2
3
4
5
6
7
8
9
10
11
12
public interface Future<V> {
// 取消并发任务执行
boolean cancel(boolean mayInterruptIfRunning);
// 获取并发任务取消状态
boolean isCancelled();
// 获取并发任务执行状态
boolean isDone();
// 获取并发任务执行结果;阻塞性的,如果并发任务没有执行完成,调用该方法会一直阻塞直到并发任务执行完成
V get() throws InterruptedException, ExecutionException;
// 获取并发任务执行结果;阻塞性的,如果阻塞时间超过设定的timeout时间,该方法会抛出异常
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

以上2种办法,通过FutureTask类和Join方法都是异步阻塞模式,效率都是比较低的。

Guava的异步调用

Guava增强了java.util.concurrent包,为了实现非阻塞获取异步线程的结果,Guava对Java的异步回调机制做了2个方面的增强。

  1. ListenableFuture,继承了Java的Future接口,使Java的Future异步任务在Guava中能被监控和获取非阻塞异步执行的结果。
  2. FutureCallback,新接口,该接口的目的是在异步任务执行完成后,根据异步结果,完成不同的回调处理,可以处理异步结果。
  • FutureCallBack
  1. onSuccess(): 在异步任务执行成功后被回调;调用时,异步任务的执行结果,作为onSuccess方法的参数被传入。
  1. onFailure():在异步任务执行过程中,抛出异常时被回调;调用时,异步任务所抛出的异常,作为onFailure方法的参数被传入。
1
2
3
4
5
public interface FutureCallback<V> {
void onSuccess(@Nullable V var1);

void onFailure(Throwable var1);
}
  • ListenableFuture

继承自Java的Future接口,增加了一个addListener方法,作用是将FutureCallback的回调封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行处理。

在实际编程中,将FutureCallback回调逻辑绑定到ListenableFuture的异步任务,可以通过Guava的Futures工具类的addCallback静态方法。

获取Guava的ListenableFuture异步任务实例,主要通过线程池ThreadPool提交Callable任务的方式来获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public static void nativeFuture() throws Exception {
// Java自带的Future模式,实现异步
ExecutorService nativeExecutor = Executors.newSingleThreadExecutor();
Future<String> nativeFuture = nativeExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// 使用sleep模拟调用耗时
TimeUnit.SECONDS.sleep(1);
return "[" + Thread.currentThread().getName() + "]: 并发包Future返回结果";
}
});
// Future只实现了异步,没有实现回调。此时主线程get结果时阻塞,可以轮询获取异步调用是否完成
System.out.println("[" + Thread.currentThread().getName() + "] ==>" + nativeFuture.get());
}

public static void guavaFuture() {
// Guava异步回调
ExecutorService executorService = Executors.newSingleThreadExecutor();
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executorService);
final ListenableFuture<String> listenableFuture = guavaExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return "[" + Thread.currentThread().getName() + "]: guava的Future返回结果";
}
});
// 注册监听器,即异步调用完成时回在指定的线程Executors.newSingleThreadExecutor()中执行注册的监听器
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
try {
String str = "[" + Thread.currentThread().getName() + "]: guava对返回结果进行异步CallBack(Runnable):" + listenableFuture.get();
System.out.println(str);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}, Executors.newSingleThreadExecutor());
// 主线程可以继续执行,异步完成后会执行注册的监听器任务.
System.out.println("[" + Thread.currentThread().getName() + "]: guavaFuture执行结束");
}

public static void guavaFuture2() {
// 除了ListenableFuture,guava还提供了FutureCallback接口
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
ListeningExecutorService guavaExecutor2 = MoreExecutors.listeningDecorator(executorService2);
final ListenableFuture<String> listenableFuture2 = guavaExecutor2.submit(new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return "[" + Thread.currentThread().getName() + "]: guava的Future返回结果";
}
});
Futures.addCallback(listenableFuture2, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
String str = "[" + Thread.currentThread().getName() + "]=======>对回调结果【" + result + "】进行FutureCallback";
System.out.println(str);
}

@Override
public void onFailure(Throwable throwable) {

}
}, Executors.newSingleThreadExecutor());
// 主线程可以继续执行,异步完成后会执行注册的监听器任务.
System.out.println( "[" + Thread.currentThread().getName() +"]: guavaFuture2执行结束");
}

执行结果:

script
1
2
3
4
5
[main] ==>[pool-1-thread-1]: 并发包Future返回结果
[main]: guavaFuture执行结束
[pool-3-thread-1]: guava对返回结果进行异步CallBack(Runnable):[pool-2-thread-1]: guava的Future返回结果
[main]: guavaFuture2执行结束
[pool-5-thread-1]=======>对回调结果【[pool-4-thread-1]: guava的Future返回结果】进行FutureCallback

Netty的异步回调模式

Netty对JavaFuture异步任务拓展如下:

  1. 继承Java的Future接口;
  2. 定义GenericFutureListener接口,异步执行结果监听器。