$Wynn5a 技术博客 - AI编程与软件工程实践
~/blog/reactor-pattern-implement-java-nio

使用 Java NIO 实现 Reactor 模式

前期准备#

需求#

因为我们将要开始的项目是对 Reactor 论文的实现,所以整个项目的需求也来源于该论文的示例。具体如下图所示

系统为分布式日志服务系统,其中本文实现的部分为 Logging Server ,它能够接受多个客户端的连接请求和日志记录请求,并将日志记录显示在控制终端上。

分析一下系统的功能点,有利于更好的开展工作

  1. 系统必须能够同时响应多个客户端的连接请求,并成功建立连接
  2. 系统必须能够同时接收多个客户端的日志记录,并打印到终端上
  3. 每个客户端发送的数据必须正确的被接收和打印
  4. 能够容忍部分客户端无故中断通信

Java New I/O 和 Reactor 模式#

Java New IO

作为一门以面向对象编程为主范式的编程语言,Java 很早就提供了对计算机 IO 设备和接口的良好封装,屏蔽了他们在各个操作系统或者平台中的差异,给开发者提供了统一的抽象,让开发者写起代码来更顺畅,早期提供的库代码主要集中在包 java.io 中。

Java New I/O (下文简称 NIO)指的是 JDK 1.4 及之后版本引入的一系列库,它给广大的 Java 开发者带了比前期版本更高性能、更易操作和更易扩展的 IO 操作库,代码主要集中在包 java.nio 中。

本文需要掌握 NIO 中的 java.nio.channels.Selectorjava.nio.channels.SelectionKeyjava.nio.channels.Channel 等核心模型以及典型用法。

Reactor 模式

详细的内容可以参考前面的论文翻译,这里简要的回顾一下它的主要结构和交互

有些概念需要简略介绍一下

  • Handle:代表一个事件源,比如 Socket
  • Event:Handle 所发生的状态变化值
  • Demultiplexer:多路分离器,对多个 Handle 的事件进行翻译和分拣
  • Dispatcher:调用 Handler 来处理各个 Handle 的事件
  • Handler:被用来处理各个 Handle 事件

两者结合#

Logging Server 中我们需要处理多个客户端的连接请求和日志记录请求,简而言之,客户端同时通过 Socket 接口跟 Server 进行通信,我们需要对多个 Socket 上面的状态变化进行分离和挑拣,并分派给特定的 Handler 去处理。NIO 带来的全新抽象模型能够更好的跟实际中的系统 IO 模型交互,这也就为我们实现基于 Reactor 模式的 Logging Server 打好了基础,我们可以利用 NIO 提供的 Readness Selection 机制方便的实现 Demultiplexer 和对 Handle 的通知。

用 NIO 实现 Reactor 模式时部分组件和 NIO 模型对应的关系如下

  • Handle —> SelectionKey
  • Event —> SelectionKey.OP_READ
  • Demultiplexer —> Selector

动手开始#

📢 所有代码均不可直接用于生产,演示代码不保证质量

整体代码结构如图

定义 Dispatcher 接口,用来分发不同的 Handle 事件到 Handler 中

java
import java.nio.channels.SelectableChannel; /** * @author wynn5a * @date 2021/2/17 */ public interface Dispatcher { void handleEvents(); void registerHandler(SelectableChannel channel, Class<? extends EventHandler> clazz, int event); void removeHandler(SelectableChannel channel); }

实现核心的 Dispatcher 的过程,是一个 Select loop

java
import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author wynn5a * @date 2021/2/17 */ public class InitiationDispatcher implements Dispatcher { private final Selector selector; private final ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private final Map<SelectionKey, Class<? extends EventHandler>> handlers = new ConcurrentHashMap<>(); private final ReadWriteLock selectorLock = new ReentrantReadWriteLock(); public InitiationDispatcher() { try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void handleEvents() { while (true) { try { checkBeforeSelect(); selector.select(); var i = selector.selectedKeys().iterator(); while (i.hasNext()) { SelectionKey handle = i.next(); Class<? extends EventHandler> handlerClass = handlers.get(handle); if (handlerClass != null) { EventHandler handler = handlerClass.getDeclaredConstructor().newInstance(); handler.setHandle(handle); Future<Boolean> handled = pool.submit(handler); if (handled.get()) { i.remove(); } } } } catch (Exception e) { throw new RuntimeException(e); } } } /** * make sure none of handler thread is updating selector */ private void checkBeforeSelect() { selectorLock.writeLock().lock(); selectorLock.writeLock().unlock(); } @Override public void registerHandler(SelectableChannel channel, Class<? extends EventHandler> handlerClass, int event) { try { lockBeforeRegister(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, event); key.attach(this); handlers.put(key, handlerClass); } catch (Exception e) { throw new RuntimeException(e); } finally { unlockAfterRegister(); } } private void unlockAfterRegister() { selectorLock.readLock().unlock(); } private void lockBeforeRegister() { selectorLock.readLock().lock(); selector.wakeup(); } @Override public void removeHandler(SelectableChannel channel) { boolean registered = channel.isRegistered(); if (registered) { SelectionKey selectionKey = channel.keyFor(selector); selectionKey.cancel(); handlers.remove(selectionKey); } } }

定义 EventHandler 接口,主要用来处理 Handle 中发生的事件,也就是处理 SelectionKey 的状态变化

java
import java.nio.channels.SelectionKey; import java.util.concurrent.Callable; /** * @author wynn5a * @date 2021/2/17 */ public interface EventHandler extends Callable<Boolean> { void handleEvent(); SelectionKey getHandle(); void setHandle(SelectionKey key); }

用来处理日志请求的 EventHandler

java
import io.github.wynn5a.reactor.nio.EventHandler; import io.github.wynn5a.reactor.nio.InitiationDispatcher; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; /** * @author wynn5a * @date 2021/2/17 */ public class LoggingHandler implements EventHandler { private SelectionKey handle; private final ByteBuffer buffer = ByteBuffer.allocate(1024); @Override public void handleEvent() { SocketChannel channel = (SocketChannel) handle.channel(); if (channel.isOpen() && handle.isReadable()) { try { buffer.clear(); int read = channel.read(buffer); if (read < 0) { System.out.println("Client connection refused"); InitiationDispatcher dispatcher = (InitiationDispatcher) handle.attachment(); dispatcher.removeHandler(channel); channel.close(); return; } buffer.flip(); slowdown(); System.out.print("LOG: " + new String(buffer.array(), UTF_8)); // handleLogContent(); buffer.clear(); } catch (IOException e) { throw new RuntimeException(e); } } } //if dont block in Dispatcher, then have to handle event more than 1 time private void handleLogContent() { if (buffer.remaining() > 0) { byte[] data = new byte[buffer.remaining()]; buffer.get(data); for (int i = 0; i < data.length; i++) { byte b = data[i]; if (b == '\r' || b == '\n') { byte[] temp = new byte[i + 1]; System.arraycopy(data, 0, temp, 0, i + 1); slowdown(); System.out.print("LOG: " + new String(temp, UTF_8)); } } } } private static void slowdown() { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public SelectionKey getHandle() { return handle; } @Override public void setHandle(SelectionKey key) { this.handle = key; } @Override public Boolean call() { handleEvent(); return true; } }

用来处理连接请求的 EventHandler

java
import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.charset.StandardCharsets.UTF_8; import io.github.wynn5a.reactor.nio.Dispatcher; import io.github.wynn5a.reactor.nio.EventHandler; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; /** * @author wynn5a * @date 2021/2/17 */ public class LoggingAcceptor implements EventHandler { private SelectionKey handle; @Override public void handleEvent() { ServerSocketChannel channel = (ServerSocketChannel) handle.channel(); if (channel.isOpen() && handle.isAcceptable()) { try { SocketChannel accept = channel.accept(); ByteBuffer buffer = ByteBuffer.wrap("Connect Successfully with Logging Server! \n".getBytes(UTF_8)); accept.write(buffer); Dispatcher dispatcher = (Dispatcher) handle.attachment(); dispatcher.registerHandler(accept, LoggingHandler.class, OP_READ); } catch (IOException e) { throw new RuntimeException(e); } } } @Override public SelectionKey getHandle() { return handle; } @Override public void setHandle(SelectionKey key) { this.handle = key; } @Override public Boolean call() { handleEvent(); return true; } }

最后是 Logging Server APP 的启动类

java
import static java.nio.channels.SelectionKey.OP_ACCEPT; import io.github.wynn5a.reactor.nio.Dispatcher; import io.github.wynn5a.reactor.nio.InitiationDispatcher; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; /** * @author wynn5a * @date 2021/2/17 */ public class LoggingServer { static Dispatcher dispatcher = new InitiationDispatcher(); public static void main(String[] args) { try { ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); channel.bind(new InetSocketAddress("127.0.0.1", 1234)); dispatcher.registerHandler(channel, LoggingAcceptor.class, OP_ACCEPT); dispatcher.handleEvents(); } catch (IOException e) { throw new RuntimeException(e); } } }

效果验证#

上面的代码实现了一个监听了本地 1234 端口的 Logging Server,它可以接收来自多个客户端的请求,我们可以使用多个终端模拟客户端来验证效果

优化一下?#

欢迎指正#