如何实现一个你自己的 RPC 框架(1)

前言

RPC,全称:Remote Procedure Call,翻译过来就是:远程过程调用。主要用来在分布式系统中,解决不同系统间的服务调用问题。RPC 不仅要让实现服务间的通信,还有一点就是要让用户在远程调用的过程中像是在本地调用一样方便。如何实现一个简易的 RPC 通信呢?这个系列的文章将会一一解答。

1. 设计

首先我们看看本地调用的流程,本地调用,我们相当于是直接通过 new 的方式去创建一个对象,然后调用对象的方法去获取数据。

本地调用流程

如果是分布式系统,服务调用方和服务提供方,分布在不同的服务上,此时调用关系就变成了这样子,我们将调用方和提供方分别用 client 和 server 代替。

分布式调用

既然是远程调用,必然涉及到 IO 通信、传输内容序列化 / 反序列化 等技术。

IO 通信分为 BIO、NIO、AIO

序列化又分为 JDK 自带的序列化机制、XML、JSON、二进制等等

  • XML
  • Json:jackson、fastjson、Gson
  • 二进制方式:hessian、avro、kyro、protobuf

各序列化方式性能比较:https://github.com/eishay/jvm-serializers/wiki

本文是 RPC 系列文章的第一篇,将会使用最简单的 BIO、jdk 自带的序列化机制,实现一个简易的 RPC。后续的文章,将会在这一篇的基础上对 RPC 进行优化升级改造。

2. 实战

模块结构:

1
2
3
4
5
6
rpc-demo-1
├── rpc-client-bio // 客户端
├── rpc-common-bio // 通用模块,封装 RPC 请求参数
└── rpc-server-bio // 服务端
├── rpc-server-bio-api //接口定义
└── rpc-server-bio-provider // 接口实现

2.1. 定义 API

首先我们在 rpc-server-bio-api 模块下定义远程调用的接口。

1
2
3
public interface IHelloService {
String sayHello(String content);
}

2.2. 实现 API

然后我们在 rpc-server-bio-provider 中引入 rpc-server-bio-api 这个模块,然后实现 IHelloService

1
2
3
4
5
6
7
8
9
public class HelloServiceImpl implements IHelloService {

@Override
public String sayHello(String content) {
System.out.println("request is coming: " + content);
return "hello " + content;
}

}

2.3. RpcServer

实现 RPC 服务提供方暴露服务,既然是提供方,肯定也是在 rpc-server-bio-provider 模块实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RpcServer {

// 阿里 P3C 不建议直接调用 Executors,此处为了偷懒
private final ExecutorService executorService = Executors.newCachedThreadPool();

public void start(Object service, int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
System.out.println(service + " 服务发布在 " + port + " 端口");
while (true) {
// 不断阻塞,等待请求
Socket socket = serverSocket.accept();
// 通过线程池异步处理,提升性能
executorService.execute(new RpcRequestHandler(socket, service));
}

}
catch (IOException e) {
e.printStackTrace();
}

}

}

真正的处理逻辑是通过线程异步处理的,这样的做法是可以提升服务端性能。

处理逻辑交给 RpcRequestHandler 去实现。主要实现思路就是获取 socket 传过来的参数,通过反射调用本地对象的方法,拿到返回结果之后,再通过 socket 写会客户端即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@AllArgsConstructor
public class RpcRequestHandler implements Runnable {

private Socket socket;

private Object service;

@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
ois = new ObjectInputStream(socket.getInputStream());
// 根据输入流拿到 RpcRequest
RpcRequest rpcRequest = (RpcRequest) ois.readObject();

// 反射调用本地服务
Object result = invoke(rpcRequest);

oos = new ObjectOutputStream(socket.getOutputStream());
// 通过输出流输出结果
oos.writeObject(result);
oos.flush();
}
catch (IOException e) {
e.printStackTrace();
}
catch (ClassNotFoundException e) {
e.printStackTrace();
}
catch (NoSuchMethodException e) {
e.printStackTrace();
}
catch (IllegalAccessException e) {
e.printStackTrace();
}
catch (InvocationTargetException e) {
e.printStackTrace();
}
finally {
if (ois != null) {
try {
ois.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
if (oos != null) {
try {
oos.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}

private Object invoke(RpcRequest rpcRequest)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// 请求参数
Object[] params = rpcRequest.getParams();
// 请求参数类型
Class<?>[] paramTypes = new Class[params.length];

for (int i = 0; i < params.length; i++) {
paramTypes[i] = params[i].getClass();
}

// 获取请求的类名
Class<?> clazz = Class.forName(rpcRequest.getClassName());
// 获取请求的方法名
Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes);

// 调用
Object result = method.invoke(service, params);
return result;
}

}

2.4. RpcRequest

上述代码中可以看到,我们调用一个 RPC,需要知道类名、方法名、请求参数这些信息,因此我封装了 RpcRequest 类去保存这些信息。这个对象在客户端和服务端都需要用到,因此,放在 rpc-common-bio 模块中。同时在 rpc-server-bio-provider 中引入 rpc-common-bio 模块。

1
2
3
4
5
6
7
8
9
10
@Data
public class RpcRequest implements Serializable {

private String className;

private String methodName;

private Object[] params;

}

2.5. RpcClient

既然 RPC 是解决服务间的远程调用,那么客户端肯定也必须知道自己需要调用哪个内容,只是具体内容的实现交给服务端而已。因此,在 rpc-client-bio 模块中需要引入 rpc-server-bio-api 获取接口定义,同时也需要引入 rpc-common-bio 用来封装 RPC 请求。

Java 中接口是不可以实例化的,但是我们想要让用户对 RPC 的过程无感知,那么调用的方式最好和本地调用一样,可以通过 Object.method 的形式获取结果。此时动态代理,就成了最佳的解决方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class RpcClient {

/**
* 动态代理
* @param interfaceClass 接口类
* @param host IP
* @param port 端口
* @param <T> 代理对象类型
* @return 代理对象
*/
public <T> T proxy(Class<T> interfaceClass, String host, int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[] { interfaceClass },
new RemoteInvocationHandler(host, port));
}

}

2.6. RemoteInvocationHandler

JDK 提供了 Proxy.newProxyInstance 的方法可以动态代理对象,具体的执行逻辑放在 RemoteInvocationHandler 中,RemoteInvocationHandler 是实现 InvocationHandler 接口,具体执行的时候,会走到该类的 invoke 方法,所以我们只需要在 invoke 方法里进行远程调用即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@AllArgsConstructor
public class RemoteInvocationHandler implements InvocationHandler {

private String host;

private int port;

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("需要动态代理生成请求对象");
// 构建 RpcRequest
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParams(args);

// 远程调用
RpcTransport rpcTransport = new RpcTransport(host, port);
Object result = rpcTransport.call(rpcRequest);
return result;
}

}

2.7. RpcTransport

远程调用,这里我也做了一层封装,远程调用的过程通过 RpcTransport 去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@AllArgsConstructor
public class RpcTransport {

private String host;

private int port;

public Object call(RpcRequest rpcRequest) {
Object result = null;
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try (Socket socket = new Socket(host, port);) {
oos = new ObjectOutputStream(socket.getOutputStream());
// 序列化
oos.writeObject(rpcRequest);
oos.flush();

ois = new ObjectInputStream(socket.getInputStream());
// 获取服务端返回结果
result = ois.readObject();
}
catch (UnknownHostException e) {
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
catch (ClassNotFoundException e) {
e.printStackTrace();
}
finally {
if (ois != null) {
try {
ois.close();
}
catch (IOException e) {
e.printStackTrace();
}
}

if (oos != null) {
try {
oos.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
return result;
}

}

2.8. 测试类

服务端测试

1
2
3
4
5
6
7
8
9
10
public class ServerMain {

public static void main(String[] args) {
IHelloService helloService = new HelloServiceImpl();

RpcServer rpcServer = new RpcServer();
rpcServer.start(helloService, 8080);
}

}

客服端测试

1
2
3
4
5
6
7
8
9
10
public class ClientMain {

public static void main(String[] args) {
RpcClient rpcClient = new RpcClient();
IHelloService helloService = rpcClient.proxy(IHelloService.class, "0.0.0.0", 8080);
String content = helloService.sayHello("rpc simple demo");
System.out.println("content = " + content);
}

}

2.9. 测试结果

先运行 ServerMain 类,查看控制台日志。

1
2
com.xkcoding.rpc.HelloServiceImpl@7b019026 服务发布在 8080 端口
request is coming: rpc simple demo

再运行 ClientMain 类,查看控制台日志。

1
2
需要动态代理生成请求对象
content = hello rpc simple demo

3. 总结

其实 RPC 的实现过程中,有三个技术点至关重要:

  1. 网络通信
  2. 序列化和反序列化
  3. 动态代理(PS:动态代理不熟悉的同学,可以 前往这里学习

示例代码

https://github.com/xkcoding/practice_demo/tree/master/rpc-demo-1

-------------本文结束  感谢您的阅读-------------
xkcoding wechat
欢迎来我的公众号「xkcoding小凯扣丁」逛逛
o(╯□╰)o 赞助一杯咖啡 ~~