前言
在上一篇《如何实现一个你自己的 RPC 框架(1)》中,我们通过 BIO 去实现了一个简单的 RPC 通信案例,今天我们将会基于上次的案例进行改造,将通信由 BIO 改造为 NIO。
1. 什么是 NIO
说到 NIO,就需要一起聊聊它的俩兄弟,BIO 以及 AIO。
BIO:Blocking IO,同步阻塞 IO。简单的说就是服务端创建一个 ServerSocket,客户端通过 Socket 去连接 ServerSocket,服务端会通过 accept 去阻塞等待客户端的请求
。BIO 存在一个十分明显的问题,就是每当存在一个客户端连接服务端时,服务端都会启动一个线程,专门的去服务这个客户端,这将导致如果出现大量的客户端连接时,服务端将会产生大量的线程开销,导致崩溃。(因此,在上一篇文章中,我们在服务端使用一个线程异步的去处理这些请求,尽量增大服务端每个线程可以处理的请求数,来达到略微优化的目的。)
NIO:NonBlocking IO,同步非阻塞 IO。在 Java 1.4 中引入,NIO 是基于 Reactor 模型。NIO 中引入了 Selector、Channel、Buffer 等概念。Buffer(缓冲区)主要用来存、取数据,Channel(通道)一个 Channel 对应一个客户端,Selector(调度器)不断轮询 Channel,处理 Channel 发生的事件。在 NIO 模型中,只有当客户端发生一次请求时,才会创建一个线程,请求结束,线程销毁,相比于 BIO 模型一个线程一直阻塞等待一个客户端所有请求而言,减少了线程的开销。NIO 的核心是同步非阻塞,无论多少客户端都可以接入服务端,客户端接入并不会耗费一个线程,只会创建一个连接然后注册到 Selector 上去,一个 Selector 线程不断的轮询所有的 Socket 连接,发现有新的事件就触发通知,然后启动一个线程专门处理一个请求即可,处理完毕释放线程,但是这个处理的过程中,要先读取数据、处理数据、再返回数据,这是个同步的过程。
AIO:Asynchronous NonBlocking IO,异步非阻塞 IO。AIO 是基于 Proactor 模型的。在 NIO 模型中,工作线程从 Channel 中读写数据是同步的。但是在 AIO 中,每个连接请求会绑定一个 Buffer,然后通过操作系统异步去完成,等操作系统完成之后,会触发一个回调通知你完成了。
2. 选型
在 Java 1.4 就引入了 NIO,但是基于原生的 API 操作比较繁琐,因此本文不会使用原生 API 来重构之前的 RPC 案例。
Netty 是一款基于 NIO 开发的高性能网络通信框架,同时支持自定义通信协议。下图是 Netty 官网的一张 Netty 组件图。
本文将使用 Netty 去重构之前的 RPC 案例。
3. 实战
本次代码是基于《如何实现一个你自己的 RPC 框架(1)》进行重构的,因此模块划分保持一致,可以在上一次的基础上进行修改。本文为了凸显区别,模块名均进行修改,由 bio 变更为 nio。
模块划分:
1 2 3 4 5 6
| rpc-demo-2 ├── rpc-client-nio ├── rpc-common-nio └── rpc-server-nio ├── rpc-server-nio-api └── rpc-server-nio-provider
|
3.1. 修改 RpcServer
将之前的 ServerSocket 改造为 Netty 的 ServerBootstrap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class RpcServer {
public void start(Object service, int port) { ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) .addLast(new ObjectEncoder()) .addLast(new RpcRequestServerHandler(service)); } });
try { serverBootstrap.bind(port).sync(); System.out.println(service + " 服务发布在 " + port + " 端口"); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
|
3.2. 修改 RpcRequestHandler
将原先的 RpcRequestHandler 重命名为 RpcRequestServerHandler,同时实现 SimpleChannelInboundHandler
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @AllArgsConstructor public class RpcRequestServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private Object service;
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception { Object invoke = invoke(rpcRequest); channelHandlerContext.writeAndFlush(invoke).addListener(ChannelFutureListener.CLOSE); }
private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { Object[] params = rpcRequest.getParams(); Class<?>[] paramTypes = new Class[params.length];
for (int i = 0; i < params.length; i++) { paramTypes[i] = params[i].getClass(); }
Class<?> clazz = Class.forName(rpcRequest.getClassName()); Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes);
Object result = method.invoke(service, params); return result; }
}
|
此时服务端重构就完成了~
3.3. 修改 RpcTransport
原先的 RpcTransport 是通过 Socket 去连接 ServerSocket,这里改造为 Netty 的 Bootstrap 形式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @AllArgsConstructor public class RpcTransport {
private String host;
private int port;
public Object call(RpcRequest rpcRequest) { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventGroup = new NioEventLoopGroup();
RpcRequestClientHandler rpcRequestClientHandler = new RpcRequestClientHandler(); bootstrap.group(eventGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) .addLast(new ObjectEncoder()) .addLast(rpcRequestClientHandler); } }).option(ChannelOption.TCP_NODELAY, true);
try { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channelFuture.channel().writeAndFlush(rpcRequest).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventGroup.shutdownGracefully(); }
return rpcRequestClientHandler.getResult(); }
}
|
3.4. 创建 RpcRequestClientHandler
实现 SimpleChannelInboundHandler
接口,获取服务端返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Getter public class RpcRequestClientHandler extends SimpleChannelInboundHandler<Object> {
private Object result;
@Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { this.result = msg; }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("调用出现异常:" + cause); ctx.close(); }
}
|
此时客户端也修改完毕~
3.5. 测试
可以和之前保持一致,这里为了看出区别,我将请求参数由 rpc simple demo
改为 rpc nio demo
先运行 ServerMain 类,查看控制台日志。
1 2
| com.xkcoding.rpc.nio.HelloServiceImpl@5ab35abe 服务发布在 8080 端口 request is coming: rpc nio demo
|
再运行 ClientMain 类,查看控制台日志。
1 2
| 需要动态代理生成请求对象 content = hello rpc nio demo
|
4. 总结
本文主要是将原先的 BIO 实现改造为 NIO 实现,主要是基于 Netty 重构网络通信部分。
但是 Netty 的精髓远不止于此,本文只是粗浅的使用 Hello World 而已。
示例代码
https://github.com/xkcoding/practice_demo/tree/master/rpc-demo-2