使用 Java NIO 实现 Reactor 模式
前期准备#
需求#
因为我们将要开始的项目是对 Reactor 论文的实现,所以整个项目的需求也来源于该论文的示例。具体如下图所示

系统为分布式日志服务系统,其中本文实现的部分为 Logging Server ,它能够接受多个客户端的连接请求和日志记录请求,并将日志记录显示在控制终端上。
分析一下系统的功能点,有利于更好的开展工作
- 系统必须能够同时响应多个客户端的连接请求,并成功建立连接
- 系统必须能够同时接收多个客户端的日志记录,并打印到终端上
- 每个客户端发送的数据必须正确的被接收和打印
- 能够容忍部分客户端无故中断通信
Java New I/O 和 Reactor 模式#
Java New IO
作为一门以面向对象编程为主范式的编程语言,Java 很早就提供了对计算机 IO 设备和接口的良好封装,屏蔽了他们在各个操作系统或者平台中的差异,给开发者提供了统一的抽象,让开发者写起代码来更顺畅,早期提供的库代码主要集中在包 java.io 中。
Java New I/O (下文简称 NIO)指的是 JDK 1.4 及之后版本引入的一系列库,它给广大的 Java 开发者带了比前期版本更高性能、更易操作和更易扩展的 IO 操作库,代码主要集中在包 java.nio 中。
本文需要掌握 NIO 中的 java.nio.channels.Selector、 java.nio.channels.SelectionKey、java.nio.channels.Channel 等核心模型以及典型用法。
Reactor 模式
详细的内容可以参考前面的论文翻译,这里简要的回顾一下它的主要结构和交互

有些概念需要简略介绍一下
- Handle:代表一个事件源,比如 Socket
- Event:Handle 所发生的状态变化值
- Demultiplexer:多路分离器,对多个 Handle 的事件进行翻译和分拣
- Dispatcher:调用 Handler 来处理各个 Handle 的事件
- Handler:被用来处理各个 Handle 事件
两者结合#
Logging Server 中我们需要处理多个客户端的连接请求和日志记录请求,简而言之,客户端同时通过 Socket 接口跟 Server 进行通信,我们需要对多个 Socket 上面的状态变化进行分离和挑拣,并分派给特定的 Handler 去处理。NIO 带来的全新抽象模型能够更好的跟实际中的系统 IO 模型交互,这也就为我们实现基于 Reactor 模式的 Logging Server 打好了基础,我们可以利用 NIO 提供的 Readness Selection 机制方便的实现 Demultiplexer 和对 Handle 的通知。
用 NIO 实现 Reactor 模式时部分组件和 NIO 模型对应的关系如下
- Handle —>
SelectionKey - Event —>
SelectionKey.OP_READ等 - Demultiplexer —>
Selector
动手开始#
📢 所有代码均不可直接用于生产,演示代码不保证质量
整体代码结构如图

定义 Dispatcher 接口,用来分发不同的 Handle 事件到 Handler 中
import java.nio.channels.SelectableChannel;
/**
* @author wynn5a
* @date 2021/2/17
*/
public interface Dispatcher {
void handleEvents();
void registerHandler(SelectableChannel channel, Class<? extends EventHandler> clazz, int event);
void removeHandler(SelectableChannel channel);
}实现核心的 Dispatcher 的过程,是一个 Select loop
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author wynn5a
* @date 2021/2/17
*/
public class InitiationDispatcher implements Dispatcher {
private final Selector selector;
private final ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final Map<SelectionKey, Class<? extends EventHandler>> handlers = new ConcurrentHashMap<>();
private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
public InitiationDispatcher() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void handleEvents() {
while (true) {
try {
checkBeforeSelect();
selector.select();
var i = selector.selectedKeys().iterator();
while (i.hasNext()) {
SelectionKey handle = i.next();
Class<? extends EventHandler> handlerClass = handlers.get(handle);
if (handlerClass != null) {
EventHandler handler = handlerClass.getDeclaredConstructor().newInstance();
handler.setHandle(handle);
Future<Boolean> handled = pool.submit(handler);
if (handled.get()) {
i.remove();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* make sure none of handler thread is updating selector
*/
private void checkBeforeSelect() {
selectorLock.writeLock().lock();
selectorLock.writeLock().unlock();
}
@Override
public void registerHandler(SelectableChannel channel, Class<? extends EventHandler> handlerClass, int event) {
try {
lockBeforeRegister();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, event);
key.attach(this);
handlers.put(key, handlerClass);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unlockAfterRegister();
}
}
private void unlockAfterRegister() {
selectorLock.readLock().unlock();
}
private void lockBeforeRegister() {
selectorLock.readLock().lock();
selector.wakeup();
}
@Override
public void removeHandler(SelectableChannel channel) {
boolean registered = channel.isRegistered();
if (registered) {
SelectionKey selectionKey = channel.keyFor(selector);
selectionKey.cancel();
handlers.remove(selectionKey);
}
}
}定义 EventHandler 接口,主要用来处理 Handle 中发生的事件,也就是处理 SelectionKey 的状态变化
import java.nio.channels.SelectionKey;
import java.util.concurrent.Callable;
/**
* @author wynn5a
* @date 2021/2/17
*/
public interface EventHandler extends Callable<Boolean> {
void handleEvent();
SelectionKey getHandle();
void setHandle(SelectionKey key);
}用来处理日志请求的 EventHandler
import io.github.wynn5a.reactor.nio.EventHandler;
import io.github.wynn5a.reactor.nio.InitiationDispatcher;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
* @author wynn5a
* @date 2021/2/17
*/
public class LoggingHandler implements EventHandler {
private SelectionKey handle;
private final ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void handleEvent() {
SocketChannel channel = (SocketChannel) handle.channel();
if (channel.isOpen() && handle.isReadable()) {
try {
buffer.clear();
int read = channel.read(buffer);
if (read < 0) {
System.out.println("Client connection refused");
InitiationDispatcher dispatcher = (InitiationDispatcher) handle.attachment();
dispatcher.removeHandler(channel);
channel.close();
return;
}
buffer.flip();
slowdown();
System.out.print("LOG: " + new String(buffer.array(), UTF_8));
// handleLogContent();
buffer.clear();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
//if dont block in Dispatcher, then have to handle event more than 1 time
private void handleLogContent() {
if (buffer.remaining() > 0) {
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
for (int i = 0; i < data.length; i++) {
byte b = data[i];
if (b == '\r' || b == '\n') {
byte[] temp = new byte[i + 1];
System.arraycopy(data, 0, temp, 0, i + 1);
slowdown();
System.out.print("LOG: " + new String(temp, UTF_8));
}
}
}
}
private static void slowdown() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public SelectionKey getHandle() {
return handle;
}
@Override
public void setHandle(SelectionKey key) {
this.handle = key;
}
@Override
public Boolean call() {
handleEvent();
return true;
}
}用来处理连接请求的 EventHandler
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.charset.StandardCharsets.UTF_8;
import io.github.wynn5a.reactor.nio.Dispatcher;
import io.github.wynn5a.reactor.nio.EventHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* @author wynn5a
* @date 2021/2/17
*/
public class LoggingAcceptor implements EventHandler {
private SelectionKey handle;
@Override
public void handleEvent() {
ServerSocketChannel channel = (ServerSocketChannel) handle.channel();
if (channel.isOpen() && handle.isAcceptable()) {
try {
SocketChannel accept = channel.accept();
ByteBuffer buffer = ByteBuffer.wrap("Connect Successfully with Logging Server! \n".getBytes(UTF_8));
accept.write(buffer);
Dispatcher dispatcher = (Dispatcher) handle.attachment();
dispatcher.registerHandler(accept, LoggingHandler.class, OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public SelectionKey getHandle() {
return handle;
}
@Override
public void setHandle(SelectionKey key) {
this.handle = key;
}
@Override
public Boolean call() {
handleEvent();
return true;
}
}最后是 Logging Server APP 的启动类
import static java.nio.channels.SelectionKey.OP_ACCEPT;
import io.github.wynn5a.reactor.nio.Dispatcher;
import io.github.wynn5a.reactor.nio.InitiationDispatcher;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
/**
* @author wynn5a
* @date 2021/2/17
*/
public class LoggingServer {
static Dispatcher dispatcher = new InitiationDispatcher();
public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress("127.0.0.1", 1234));
dispatcher.registerHandler(channel, LoggingAcceptor.class, OP_ACCEPT);
dispatcher.handleEvents();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}效果验证#
上面的代码实现了一个监听了本地 1234 端口的 Logging Server,它可以接收来自多个客户端的请求,我们可以使用多个终端模拟客户端来验证效果