前言 在前面的文章《如何实现一个你自己的 RPC 框架(1)》 中,我们通过 BIO 去实现了一个简单的 RPC 通信案例,同时在《如何实现一个你自己的 RPC 框架(2)》 中我们将 BIO 改造成了 NIO,今天我们将会基于第一次 BIO 的案例进行改造,将 BIO 改造为 AIO 通信。
1. 什么是 AIO 在《如何实现一个你自己的 RPC 框架(2)》 中已经介绍了 BIO、NIO、AIO,不知道的小伙伴可以前往查看,在此不做赘述了。
2. 选型 本文将选用国产 AIO 框架 smart-socket 来实现 AIO 改造。
3. 实战 本次代码是基于《如何实现一个你自己的 RPC 框架(1)》 进行重构的,因此模块划分保持一致,可以在上一次的基础上进行修改。本文为了凸显区别,模块名均进行修改,由 bio 变更为 aio。
模块划分:
1 2 3 4 5 6 rpc-demo-3 ├── rpc-client-aio ├── rpc-common-aio └── rpc-server-aio ├── rpc-server-aio-api └── rpc-server-aio-provider
3.1. 修改 common 模块 首先引入 smart-socket
依赖
1 2 3 4 5 6 7 8 9 10 <dependencies > <dependency > <groupId > org.smartboot.socket</groupId > <artifactId > aio-core</artifactId > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-api</artifactId > </dependency > </dependencies >
然后在 common 模块中创建客户端和服务端都需要遵守的编解码协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class RpcProtocol implements Protocol<byte[]> { private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE; @Override public byte [] decode(ByteBuffer readBuffer, AioSession<byte []> session) { int remaining = readBuffer.remaining(); if (remaining < INTEGER_BYTES) { return null ; } int messageSize = readBuffer.getInt(readBuffer.position()); if (messageSize > remaining) { return null ; } byte [] data = new byte [messageSize - INTEGER_BYTES]; readBuffer.getInt(); readBuffer.get(data); return data; } }
3.2. 修改 rpc-server-aio-provider 模块 将之前的 RpcServer 中的 ServerSocket 改造为 AioQuickServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class RpcServer { public void start (Object service, int port) { RpcServerMessageProcessor messageProcessor = new RpcServerMessageProcessor(service); AioQuickServer<byte []> server = new AioQuickServer<>(port, new RpcProtocol(), messageProcessor); try { server.start(); System.out.println(service + " 服务发布在 " + port + " 端口" ); } catch (IOException e) { e.printStackTrace(); } } }
将原先的 RpcRequestHandler 重命名为 RpcServerMessageProcessor,同时实现 MessageProcessor
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 @AllArgsConstructor public class RpcServerMessageProcessor implements MessageProcessor<byte[]> { private Object service; @Override public void process (AioSession<byte []> session, byte [] msg) { ObjectInput objectInput = null ; ObjectOutput objectOutput = null ; try { objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcRequest rpcRequest = (RpcRequest) objectInput.readObject(); Object invoke = invoke(rpcRequest); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(invoke); byte [] data = byteArrayOutputStream.toByteArray(); session.writeBuffer().writeInt(data.length + 4 ); session.writeBuffer().write(data); session.writeBuffer().flush(); } catch (IOException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { if (objectInput != null ) { try { objectInput.close(); } catch (IOException e) { e.printStackTrace(); } } if (objectOutput != null ) { try { objectOutput.close(); } catch (IOException e) { e.printStackTrace(); } } } } @Override public void stateEvent (AioSession<byte []> session, StateMachineEnum stateMachineEnum, Throwable throwable) { } private Object invoke (RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { Object[] params = rpcRequest.getParams(); Class<?>[] paramTypes = new Class[params.length]; for (int i = 0 ; i < params.length; i++) { paramTypes[i] = params[i].getClass(); } Class<?> clazz = Class.forName(rpcRequest.getClassName()); Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes); Object result = method.invoke(service, params); return result; } }
此时服务端重构就完成了~
3.3. 修改 rpc-client-aio 模块 原先的 RpcTransport 是通过 Socket 去连接 ServerSocket,这里改造为 AioQuickClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @AllArgsConstructor public class RpcTransport { private String host; private int port; public Object call (RpcRequest rpcRequest) { CompletableFuture<Object> waitForResult = new CompletableFuture<>(); RpcClientMessageProcessor messageProcessor = new RpcClientMessageProcessor(waitForResult); AioQuickClient<byte []> consumer = new AioQuickClient<>(host, port, new RpcProtocol(), messageProcessor); try { consumer.start(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(rpcRequest); byte [] data = byteArrayOutputStream.toByteArray(); messageProcessor.getAioSession().writeBuffer().writeInt(data.length + 4 ); messageProcessor.getAioSession().writeBuffer().write(data); messageProcessor.getAioSession().writeBuffer().flush(); return waitForResult.get(3 , TimeUnit.SECONDS); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { consumer.shutdown(); } return null ; } }
创建 RpcClientMessageProcessor
实现 SimpleChannelInboundHandler
接口,获取服务端返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Getter public class RpcClientMessageProcessor implements MessageProcessor<byte[]> { private AioSession<byte []> aioSession; private CompletableFuture<Object> waitForResult; public RpcClientMessageProcessor (CompletableFuture<Object> waitForResult) { this .waitForResult = waitForResult; } @Override public void process (AioSession<byte []> session, byte [] msg) { ObjectInput objectInput = null ; try { objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); waitForResult.complete(objectInput.readObject()); } catch (Exception e) { e.printStackTrace(); } finally { if (objectInput != null ) { try { objectInput.close(); } catch (IOException e) { e.printStackTrace(); } } } } @Override public void stateEvent (AioSession<byte []> session, StateMachineEnum stateMachineEnum, Throwable throwable) { switch (stateMachineEnum) { case NEW_SESSION: this .aioSession = session; break ; } } }
此时客户端也修改完毕~
3.5. 测试 可以和之前保持一致,这里为了看出区别,我将请求参数由 rpc simple demo
改为 rpc aio demo
先运行 ServerMain 类,查看控制台日志。
1 2 3 4 5 6 :: smart-socket :: (v1.4.11) smart-socket server started on port 8080 ,threadNum:16 smart-socket server config is IoServerConfig{readBufferSize=512 , writeQueueCapacity=16 , host='null' , monitor=null , port=8080 , processor=com.xkcoding.rpc.nio.RpcServerMessageProcessor@f 34cad02, protocol=com.xkcoding.rpc.nio.RpcProtocol@6 c14ef0a, bannerEnabled=true , socketOptions=null , threadNum=16 , writeBufferSize=128 } com.xkcoding.rpc.nio.HelloServiceImpl@ecadb 3cc 服务发布在 8080 端口 request is coming: rpc aio demo
再运行 ClientMain 类,查看控制台日志。
1 2 需要动态代理生成请求对象 content = hello rpc aio demo
4. 总结 本文主要是将原先的 BIO 实现改造为 AIO 实现,主要是基于 smart-socket
重构网络通信部分。
因为 AIO 目前应用仍然不是十分广泛,因此后续 RPC 的改造会在 NIO 的基础上次进行优化。敬请期待~
参考 smart-socket 使用手册:https://smartboot.gitee.io/book/smart-socket/ 示例代码 https://github.com/xkcoding/practice_demo/tree/master/rpc-demo-3