JAVA基础知识-Reactor反应器模式

高性能的网络编程都离不开反应器模式,Nginx、Redis、Netty都采用了反应器模式。

Reactor反应器模式

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程的职责:负责响应NIO选择器监控的IO事件,并且分发到Handlers处理器。

  2. Handlers处理器的职责:非阻塞的执行业务处理逻辑。

单线程的Reactor反应器

  • void attach(Object object): 此方法可以将任何Java的POJO对象作为附件添加到SelectionKey实例

  • Object attachement(): 此方法的作用是取出通过attach(Object)添加到SelectionKey选择键实例的附件

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Reactor.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @program: NIOStudy
* @description: 单线程Reactor反应器模型
* @author: LEEZY
* @create: 2020-03-17 10:39
**/
public class Reactor implements Runnable{

final Selector selector;
final ServerSocketChannel serverSocket;

Reactor(int port) throws IOException {
// 打开选择器、ServerSocketChannel连接监听通道
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 将新连接处理器作为附件,绑定到serverSocket选择键
selectionKey.attach(new AcceptorHandler());
}
@Override
public void run() {
// 选择器轮询
try {
while(!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator iterator = selected.iterator();
while(iterator.hasNext()) {
// 反应器负责dispatch收到的事件
SelectionKey selectionKey = (SelectionKey) iterator.next();
dispatcher(selectionKey);
}
}
} catch(IOException e) {

}
}

// 反应器分发类
void dispatcher(SelectionKey selectionKey) {
Runnable handler = (Runnable) (selectionKey.attachment());
if (handler != null) {
handler.run();
}
}

// 新连接处理类
class AcceptorHandler implements Runnable {
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) {
// 调用Handler的构造方法,将SocketChannel注册到反应器Reactor类的同一个选择器,保证Reactor类和Handler类在同一个线程执行
new Handler(selector, socketChannel);
}
} catch (IOException e) {

}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Handler.java
import com.sun.media.jfxmedia.logging.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
* @program: NIOStudy
* @description: 处理器
* @author: LEEZY
* @create: 2020-03-17 12:08
**/
final class Handler implements Runnable {

final SocketChannel socketChannel;
final SelectionKey selectionKey;
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

static final int READING = 0, SENDING = 1;

int state = READING;

Handler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
socketChannel.configureBlocking(false);
// 设置感兴趣的IO事件
selectionKey = socketChannel.register(selector, 0);
// 将Handler自身作为选择键的附件,这样在Reactor类分发事件时能执行到该Handler的run方法
selectionKey.attach(this);
// 注册Read就绪事件
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}


@Override
public void run() {
try {
if (state == SENDING) {
// 写入通道
socketChannel.write(byteBuffer);
// 转换为写入模式
byteBuffer.clear();
// 写入完成后注册read就绪事件
selectionKey.interestOps(SelectionKey.OP_READ);
// 更改状态
state = READING;
} else if (state == READING) {
int length = 0;
// 从通道读取
while((length = socketChannel.read(byteBuffer)) > 0) {
System.out.println(new String(byteBuffer.array(), 0, length));
}
byteBuffer.flip();
selectionKey.interestOps(SelectionKey.OP_WRITE);
state = SENDING;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

多线程Reactor反应器

总体来说,多线程池反应器的模式,大致如下:

  • 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听受到阻塞。

  • 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。

  • 多线程Reactor反应器实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    //....反应器
    class MultiThreadEchoServerReactor {
    ServerSocketChannelserverSocket;
    AtomicInteger next = new AtomicInteger(0);
    //选择器集合,引入多个选择器
    Selector[] selectors = new Selector[2];
    //引入多个子反应器
    SubReactor[] subReactors = null;
    MultiThreadEchoServerReactor() throws IOException {
    //初始化多个选择器
    selectors[0] = Selector.open();
    selectors[1] = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);
    serverSocket.socket().bind(address);
    //非阻塞
    serverSocket.configureBlocking(false);
    //第一个选择器,负责监控新连接事件
    SelectionKeysk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
    //绑定Handler:attach新连接监控handler处理器到SelectionKey(选择键)
    sk.attach(new AcceptorHandler());
    //第一个子反应器,一子反应器负责一个选择器
    SubReactor subReactor1 = new SubReactor(selectors[0]);
    //第二个子反应器,一子反应器负责一个选择器
    SubReactor subReactor2 = new SubReactor(selectors[1]);
    subReactors = new SubReactor[]{subReactor1, subReactor2};
    }

    private void startService() {
    // 一子反应器对应一个线程
    new Thread(subReactors[0]).start();
    new Thread(subReactors[1]).start();
    }

    //子反应器
    class SubReactor implements Runnable {
    //每个线程负责一个选择器的查询和选择
    final Selector selector;
    public SubReactor(Selector selector) {
    this.selector = selector;
    }
    public void run() {
    try {
    while (! Thread.interrupted()) {
    selector.select();
    Set<SelectionKey>keySet = selector.selectedKeys();
    Iterator<SelectionKey> it = keySet.iterator();
    while (it.hasNext()) {
    //反应器负责dispatch收到的事件
    SelectionKeysk = it.next();
    dispatch(sk);
    }
    keySet.clear();
    }
    } catch (IOException ex) {
    ex.printStackTrace();
    }
    }
    void dispatch(SelectionKeysk) {
    Runnable handler = (Runnable) sk.attachment();
    //调用之前attach绑定到选择键的handler处理器对象
    if (handler ! = null) {
    handler.run();
    }
    }
    }
    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
    public void run() {
    try {
    SocketChannel channel = serverSocket.accept();
    if (channel ! = null)
    new MultiThreadEchoHandler(selectors[next.get()], channel);
    } catch (IOException e) {
    e.printStackTrace();
    }
    if (next.incrementAndGet() == selectors.length) {
    next.set(0);
    }
    }
    }
    public static void main(String[] args) throws IOException {
    MultiThreadEchoServerReactor server =
    new MultiThreadEchoServerReactor();
    server.startService();
    }
    }
  • 多线程Handler处理器实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws
IOException {
channel = c;
c.configureBlocking(false);
//取得选择键,、再设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件分发(dispatch)
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
//异步任务,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//业务处理,不在反应器线程中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写入模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读取模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了,这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}

}
0%