如何实现一个你自己的 RPC 框架(3)

前言

在前面的文章《如何实现一个你自己的 RPC 框架(1)》中,我们通过 BIO 去实现了一个简单的 RPC 通信案例,同时在《如何实现一个你自己的 RPC 框架(2)》中我们将 BIO 改造成了 NIO,今天我们将会基于第一次 BIO 的案例进行改造,将 BIO 改造为 AIO 通信。

1. 什么是 AIO

《如何实现一个你自己的 RPC 框架(2)》中已经介绍了 BIO、NIO、AIO,不知道的小伙伴可以前往查看,在此不做赘述了。

2. 选型

本文将选用国产 AIO 框架 smart-socket 来实现 AIO 改造。

3. 实战

本次代码是基于《如何实现一个你自己的 RPC 框架(1)》进行重构的,因此模块划分保持一致,可以在上一次的基础上进行修改。本文为了凸显区别,模块名均进行修改,由 bio 变更为 aio。

模块划分:

1
2
3
4
5
6
rpc-demo-3
├── rpc-client-aio // 客户端
├── rpc-common-aio // 通用模块,封装 RPC 请求参数
└── rpc-server-aio // 服务端
├── rpc-server-aio-api //接口定义
└── rpc-server-aio-provider // 接口实现

3.1. 修改 common 模块

首先引入 smart-socket 依赖

1
2
3
4
5
6
7
8
9
10
<dependencies>
<dependency>
<groupId>org.smartboot.socket</groupId>
<artifactId>aio-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>

然后在 common 模块中创建客户端和服务端都需要遵守的编解码协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RpcProtocol implements Protocol<byte[]> {
private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE;

/**
* 对于从Socket流中获取到的数据采用当前Protocol的实现类协议进行解析。
*
* @param readBuffer 待处理的读buffer
* @param session 本次需要解码的session
* @return 本次解码成功后封装的业务消息对象, 返回null则表示解码未完成
*/
@Override
public byte[] decode(ByteBuffer readBuffer, AioSession<byte[]> session) {
int remaining = readBuffer.remaining();
if (remaining < INTEGER_BYTES) {
return null;
}
int messageSize = readBuffer.getInt(readBuffer.position());
if (messageSize > remaining) {
return null;
}
byte[] data = new byte[messageSize - INTEGER_BYTES];
readBuffer.getInt();
readBuffer.get(data);
return data;
}
}

3.2. 修改 rpc-server-aio-provider 模块

将之前的 RpcServer 中的 ServerSocket 改造为 AioQuickServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class RpcServer {

public void start(Object service, int port) {
RpcServerMessageProcessor messageProcessor = new RpcServerMessageProcessor(service);
AioQuickServer<byte[]> server = new AioQuickServer<>(port, new RpcProtocol(), messageProcessor);

try {
server.start();
System.out.println(service + " 服务发布在 " + port + " 端口");
} catch (IOException e) {
e.printStackTrace();
}

}

}

将原先的 RpcRequestHandler 重命名为 RpcServerMessageProcessor,同时实现 MessageProcessor 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@AllArgsConstructor
public class RpcServerMessageProcessor implements MessageProcessor<byte[]> {

private Object service;

/**
* 处理接收到的消息
*
* @param session 通信会话
* @param msg 待处理的业务消息
*/
@Override
public void process(AioSession<byte[]> session, byte[] msg) {
ObjectInput objectInput = null;
ObjectOutput objectOutput = null;
try {
objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
RpcRequest rpcRequest = (RpcRequest) objectInput.readObject();
// 反射调用
Object invoke = invoke(rpcRequest);

// 写回客户端
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
objectOutput = new ObjectOutputStream(byteArrayOutputStream);
objectOutput.writeObject(invoke);
byte[] data = byteArrayOutputStream.toByteArray();
session.writeBuffer().writeInt(data.length + 4);
session.writeBuffer().write(data);
session.writeBuffer().flush();
} catch (IOException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
if (objectInput != null) {
try {
objectInput.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutput != null) {
try {
objectOutput.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 状态机事件,当枚举事件发生时由框架触发该方法
*
* @param session 本次触发状态机的AioSession对象
* @param stateMachineEnum 状态枚举
* @param throwable 异常对象,如果存在的话
* @see StateMachineEnum
*/
@Override
public void stateEvent(AioSession<byte[]> session, StateMachineEnum stateMachineEnum, Throwable throwable) {

}

private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// 请求参数
Object[] params = rpcRequest.getParams();
// 请求参数类型
Class<?>[] paramTypes = new Class[params.length];

for (int i = 0; i < params.length; i++) {
paramTypes[i] = params[i].getClass();
}

// 获取请求的类名
Class<?> clazz = Class.forName(rpcRequest.getClassName());
// 获取请求的方法名
Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes);

// 调用
Object result = method.invoke(service, params);
return result;
}
}

此时服务端重构就完成了~

3.3. 修改 rpc-client-aio 模块

原先的 RpcTransport 是通过 Socket 去连接 ServerSocket,这里改造为 AioQuickClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@AllArgsConstructor
public class RpcTransport {

private String host;

private int port;

public Object call(RpcRequest rpcRequest) {
// 通过 CompletableFuture 阻塞获取返回结果
CompletableFuture<Object> waitForResult = new CompletableFuture<>();
RpcClientMessageProcessor messageProcessor = new RpcClientMessageProcessor(waitForResult);
AioQuickClient<byte[]> consumer = new AioQuickClient<>(host, port, new RpcProtocol(), messageProcessor);

try {
consumer.start();

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream);
objectOutput.writeObject(rpcRequest);
byte[] data = byteArrayOutputStream.toByteArray();
messageProcessor.getAioSession().writeBuffer().writeInt(data.length + 4);
messageProcessor.getAioSession().writeBuffer().write(data);
messageProcessor.getAioSession().writeBuffer().flush();
// 阻塞 3s 获取返回结果
return waitForResult.get(3, TimeUnit.SECONDS);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
consumer.shutdown();
}
return null;
}

}

创建 RpcClientMessageProcessor 实现 SimpleChannelInboundHandler 接口,获取服务端返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Getter
public class RpcClientMessageProcessor implements MessageProcessor<byte[]> {

private AioSession<byte[]> aioSession;
private CompletableFuture<Object> waitForResult;

public RpcClientMessageProcessor(CompletableFuture<Object> waitForResult) {
this.waitForResult = waitForResult;
}


/**
* 处理接收到的消息
*
* @param session 通信会话
* @param msg 待处理的业务消息
*/
@Override
public void process(AioSession<byte[]> session, byte[] msg) {
ObjectInput objectInput = null;
try {
objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
// 获取到结果,并通知主线程获取结果
waitForResult.complete(objectInput.readObject());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (objectInput != null) {
try {
objectInput.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 状态机事件,当枚举事件发生时由框架触发该方法
*
* @param session 本次触发状态机的AioSession对象
* @param stateMachineEnum 状态枚举
* @param throwable 异常对象,如果存在的话
* @see StateMachineEnum
*/
@Override
public void stateEvent(AioSession<byte[]> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
switch (stateMachineEnum) {
case NEW_SESSION:
this.aioSession = session;
break;
}
}
}

此时客户端也修改完毕~

3.5. 测试

可以和之前保持一致,这里为了看出区别,我将请求参数由 rpc simple demo 改为 rpc aio demo

先运行 ServerMain 类,查看控制台日志。

1
2
3
4
5
6
// smart socket 的 banner,省略了。。。
:: smart-socket :: (v1.4.11)
smart-socket server started on port 8080,threadNum:16
smart-socket server config is IoServerConfig{readBufferSize=512, writeQueueCapacity=16, host='null', monitor=null, port=8080, processor=com.xkcoding.rpc.nio.RpcServerMessageProcessor@f34cad02, protocol=com.xkcoding.rpc.nio.RpcProtocol@6c14ef0a, bannerEnabled=true, socketOptions=null, threadNum=16, writeBufferSize=128}
com.xkcoding.rpc.nio.HelloServiceImpl@ecadb3cc 服务发布在 8080 端口
request is coming: rpc aio demo

再运行 ClientMain 类,查看控制台日志。

1
2
需要动态代理生成请求对象
content = hello rpc aio demo

4. 总结

本文主要是将原先的 BIO 实现改造为 AIO 实现,主要是基于 smart-socket 重构网络通信部分。

因为 AIO 目前应用仍然不是十分广泛,因此后续 RPC 的改造会在 NIO 的基础上次进行优化。敬请期待~

参考

  • smart-socket 使用手册:https://smartboot.gitee.io/book/smart-socket/

示例代码

https://github.com/xkcoding/practice_demo/tree/master/rpc-demo-3

-------------本文结束  感谢您的阅读-------------
xkcoding wechat
欢迎来我的公众号「xkcoding小凯扣丁」逛逛
o(╯□╰)o 赞助一杯咖啡 ~~