如何实现一个你自己的 RPC 框架(2)

前言

在上一篇《如何实现一个你自己的 RPC 框架(1)》中,我们通过 BIO 去实现了一个简单的 RPC 通信案例,今天我们将会基于上次的案例进行改造,将通信由 BIO 改造为 NIO。

1. 什么是 NIO

说到 NIO,就需要一起聊聊它的俩兄弟,BIO 以及 AIO。

BIO:Blocking IO,同步阻塞 IO。简单的说就是服务端创建一个 ServerSocket,客户端通过 Socket 去连接 ServerSocket,服务端会通过 accept 去阻塞等待客户端的请求。BIO 存在一个十分明显的问题,就是每当存在一个客户端连接服务端时,服务端都会启动一个线程,专门的去服务这个客户端,这将导致如果出现大量的客户端连接时,服务端将会产生大量的线程开销,导致崩溃。(因此,在上一篇文章中,我们在服务端使用一个线程异步的去处理这些请求,尽量增大服务端每个线程可以处理的请求数,来达到略微优化的目的。)

NIO:NonBlocking IO,同步非阻塞 IO。在 Java 1.4 中引入,NIO 是基于 Reactor 模型。NIO 中引入了 Selector、Channel、Buffer 等概念。Buffer(缓冲区)主要用来存、取数据,Channel(通道)一个 Channel 对应一个客户端,Selector(调度器)不断轮询 Channel,处理 Channel 发生的事件。在 NIO 模型中,只有当客户端发生一次请求时,才会创建一个线程,请求结束,线程销毁,相比于 BIO 模型一个线程一直阻塞等待一个客户端所有请求而言,减少了线程的开销。NIO 的核心是同步非阻塞,无论多少客户端都可以接入服务端,客户端接入并不会耗费一个线程,只会创建一个连接然后注册到 Selector 上去,一个 Selector 线程不断的轮询所有的 Socket 连接,发现有新的事件就触发通知,然后启动一个线程专门处理一个请求即可,处理完毕释放线程,但是这个处理的过程中,要先读取数据、处理数据、再返回数据,这是个同步的过程。

AIO:Asynchronous NonBlocking IO,异步非阻塞 IO。AIO 是基于 Proactor 模型的。在 NIO 模型中,工作线程从 Channel 中读写数据是同步的。但是在 AIO 中,每个连接请求会绑定一个 Buffer,然后通过操作系统异步去完成,等操作系统完成之后,会触发一个回调通知你完成了。

2. 选型

在 Java 1.4 就引入了 NIO,但是基于原生的 API 操作比较繁琐,因此本文不会使用原生 API 来重构之前的 RPC 案例。

Netty 是一款基于 NIO 开发的高性能网络通信框架,同时支持自定义通信协议。下图是 Netty 官网的一张 Netty 组件图。

components

本文将使用 Netty 去重构之前的 RPC 案例。

3. 实战

本次代码是基于《如何实现一个你自己的 RPC 框架(1)》进行重构的,因此模块划分保持一致,可以在上一次的基础上进行修改。本文为了凸显区别,模块名均进行修改,由 bio 变更为 nio。

模块划分:

1
2
3
4
5
6
rpc-demo-2
├── rpc-client-nio // 客户端
├── rpc-common-nio // 通用模块,封装 RPC 请求参数
└── rpc-server-nio // 服务端
├── rpc-server-nio-api //接口定义
└── rpc-server-nio-provider // 接口实现

3.1. 修改 RpcServer

将之前的 ServerSocket 改造为 Netty 的 ServerBootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RpcServer {

public void start(Object service, int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
.addLast(new ObjectEncoder())
// 自定义处理器
.addLast(new RpcRequestServerHandler(service));
}
});

try {
serverBootstrap.bind(port).sync();
System.out.println(service + " 服务发布在 " + port + " 端口");
}
catch (InterruptedException e) {
e.printStackTrace();
}

}

}

3.2. 修改 RpcRequestHandler

将原先的 RpcRequestHandler 重命名为 RpcRequestServerHandler,同时实现 SimpleChannelInboundHandler 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@AllArgsConstructor
public class RpcRequestServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

private Object service;

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
// 反射调用
Object invoke = invoke(rpcRequest);
channelHandlerContext.writeAndFlush(invoke).addListener(ChannelFutureListener.CLOSE);
}

private Object invoke(RpcRequest rpcRequest)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// 请求参数
Object[] params = rpcRequest.getParams();
// 请求参数类型
Class<?>[] paramTypes = new Class[params.length];

for (int i = 0; i < params.length; i++) {
paramTypes[i] = params[i].getClass();
}

// 获取请求的类名
Class<?> clazz = Class.forName(rpcRequest.getClassName());
// 获取请求的方法名
Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes);

// 调用
Object result = method.invoke(service, params);
return result;
}

}

此时服务端重构就完成了~

3.3. 修改 RpcTransport

原先的 RpcTransport 是通过 Socket 去连接 ServerSocket,这里改造为 Netty 的 Bootstrap 形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@AllArgsConstructor
public class RpcTransport {

private String host;

private int port;

public Object call(RpcRequest rpcRequest) {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventGroup = new NioEventLoopGroup();

RpcRequestClientHandler rpcRequestClientHandler = new RpcRequestClientHandler();
bootstrap.group(eventGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
.addLast(new ObjectEncoder())
// 添加自定义处理器
.addLast(rpcRequestClientHandler);
}
}).option(ChannelOption.TCP_NODELAY, true);

try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// 发送 RPC 请求参数
channelFuture.channel().writeAndFlush(rpcRequest).sync();
// 等待连接关闭
channelFuture.channel().closeFuture().sync();
}
catch (InterruptedException e) {
e.printStackTrace();
}
finally {
eventGroup.shutdownGracefully();
}

return rpcRequestClientHandler.getResult();
}

}

3.4. 创建 RpcRequestClientHandler

实现 SimpleChannelInboundHandler 接口,获取服务端返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Getter
public class RpcRequestClientHandler extends SimpleChannelInboundHandler<Object> {

private Object result;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
this.result = msg;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("调用出现异常:" + cause);
ctx.close();
}

}

此时客户端也修改完毕~

3.5. 测试

可以和之前保持一致,这里为了看出区别,我将请求参数由 rpc simple demo 改为 rpc nio demo

先运行 ServerMain 类,查看控制台日志。

1
2
com.xkcoding.rpc.nio.HelloServiceImpl@5ab35abe 服务发布在 8080 端口
request is coming: rpc nio demo

再运行 ClientMain 类,查看控制台日志。

1
2
需要动态代理生成请求对象
content = hello rpc nio demo

4. 总结

本文主要是将原先的 BIO 实现改造为 NIO 实现,主要是基于 Netty 重构网络通信部分。

但是 Netty 的精髓远不止于此,本文只是粗浅的使用 Hello World 而已。

示例代码

https://github.com/xkcoding/practice_demo/tree/master/rpc-demo-2

-------------本文结束  感谢您的阅读-------------
xkcoding wechat
欢迎来我的公众号「xkcoding小凯扣丁」逛逛
o(╯□╰)o 赞助一杯咖啡 ~~