自己动手实现简单的微服务rpc调用

Author Avatar
MagicDo 10月 13, 2019
  • 在其它设备中阅读本文章

####

什么是rpc(来自百度百科)

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

参考资料:https://www.jianshu.com/p/2accc2840a1b

参考代码:https://github.com/modouxiansheng/Doraemon/tree/master/rpc-server

RPC要解决的两个问题:

  1. 解决分布式系统中,服务之间的调用问题。
  2. 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。

实际情况下,RPC很少用到http协议来进行数据传输,毕竟我只是想传输一下数据而已,何必动用到一个文本传输的应用层协议呢,我为什么不直接使用二进制传输?比如直接用Java的Socket协议进行传输?

要实现一个RPC不算难,难的是实现一个高性能高可靠的RPC框架。

比如,既然是分布式了,那么一个服务可能有多个实例,你在调用时,要如何获取这些实例的地址呢?

这时候就需要一个服务注册中心,比如在Dubbo里头,就可以使用Zookeeper作为注册中心,在调用时,从Zookeeper获取服务的实例列表,再从中选择一个进行调用。

那么选哪个调用好呢?这时候就需要负载均衡了,于是你又得考虑如何实现复杂均衡,比如Dubbo就提供了好几种负载均衡策略。

这还没完,总不能每次调用时都去注册中心查询实例列表吧,这样效率多低呀,于是又有了缓存,有了缓存,就要考虑缓存的更新问题,blablabla……

你以为就这样结束了,没呢,还有这些:

  • 客户端总不能每次调用完都干等着服务端返回数据吧,于是就要支持异步调用;
  • 服务端的接口修改了,老的接口还有人在用,怎么办?总不能让他们都改了吧?这就需要版本控制了;
  • 服务端总不能每次接到请求都马上启动一个线程去处理吧?于是就需要线程池;
  • 服务端关闭时,还没处理完的请求怎么办?是直接结束呢,还是等全部请求处理完再关闭呢?
  • ……

如此种种,都是一个优秀的RPC框架需要考虑的问题。

我们当前的目标是先实现个简单的rpc 调用,来理解我们分布式系统中服务调用的基本原理。

实现的话,当然就分为客户端和服务端两步,参照我们之前介绍的 feignclient 我们来完成一个简单的rpcclient

整个流程的话,就是客户端调用远程服务端的某个方法,服务端最终返回数据给客户端。

客户端实现

客户端要发送什么数据给服务端呢?

我们在客户端调用的是服务端提供的接口,所以我们需要将客户端调用的信息传输过去,那么我们可以将要传输的信息分为两类

  • 第一类是服务端可以根据这个信息找到相应的接口实现类和方法

  • 第二类是调用此方法传输的参数信息

所以我们首先提供实体来保存这两类数据,由于是网络传输,还要保证我们的实体要序列化。

请求的话,我们就用RpcRequest 来保存

1
2
3
4
5
6
7
8
9
10
11
12
/**
* rpc request
*/

@Data
public class RpcRequest implements Serializable {

private static final long serialVersionUID = -8558250604858740384L;
private String className;
private String methodName;
private Class<?> [] parameTypes;
private Object [] parameters;
}

响应的话,我们用RpcResponse 来保存

1
2
3
4
5
6
7
8
/**
* rpc response
*/

@Data
public class RpcResponse implements Serializable {
private static final long serialVersionUID = 9201838302447943808L;
private Object result;
}

那么我们如何将用户的请求封装到客户端,客户端再发给服务端呢?

借助于我们之前理解的 FeignClient和spring 容器的知识,我们可以利用注解和动态代理来实现请求封装和服务调用。

我们提供一个简单的注解RpcClient:

1
2
3
4
5
6
7

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClient {

}

上面注解的作用和FeignClient 作用一样,来标示这是一个远程调用的接口,以便spring能够扫描到,并注入容器中。此过程,我们需要提供代理类,以便动态代理能够调用,提供工厂类,生成代理类,提供注册类,将bean注入到spring 容器。spring 已经为我们提供了这方面的开发流程,详细的问题,请自行学习spring的相关文章。

首先提供代理类,代理类是服务调用的核心,就是通过执行代理类的 invoke方法,将用户信息以及相关类信息序列化,通过 Socket 发给服务端,并解析服务端响应的数据,最终返回给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
@Component
public class RpcDynamicProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String requestJson = objectToJson(method,args);
Socket client = new Socket("127.0.0.1", 20006);
client.setSoTimeout(10000);
//获取Socket的输出流,用来发送数据到服务端
PrintStream out = new PrintStream(client.getOutputStream());
//获取Socket的输入流,用来接收从服务端发送过来的数据
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
//发送数据到服务端
out.println(requestJson);
RpcResponse response = new RpcResponse();
Gson gson =new Gson();
try{
//从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常
String responsJson = buf.readLine();
response = gson.fromJson(responsJson, RpcResponse.class);
}catch(SocketTimeoutException e){
log.info("Time out, No response");
}
if(client != null){
//如果构造函数建立起了连接,则关闭套接字,如果没有建立起连接,自然不用关闭
client.close(); //只关闭socket,其关联的输入输出流也会被关闭
}
return response.getResult();
}



public String objectToJson(Method method,Object [] args){
RpcRequest request = new RpcRequest();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
String className = method.getDeclaringClass().getName();
request.setMethodName(methodName);
request.setParameTypes(parameterTypes);
request.setParameters(args);
request.setClassName(getClassName(className));
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
Gson gson = gsonBuilder.create();
return gson.toJson(request);
}

private String getClassName(String beanClassName){
String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1);
className = className.substring(0,1).toLowerCase() + className.substring(1);
return className;
}
}

提供 工厂类,生成代理类的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RpcClientFactoryBean implements FactoryBean {

private Class<?> classType;
@Autowired
private RpcDynamicProxy rpcDynamicProxy;

public RpcClientFactoryBean(Class<?> classType) {
this.classType = classType;
}

@Override
public Object getObject() throws Exception {
ClassLoader classLoader = classType.getClassLoader();
Object object = Proxy.newProxyInstance(classLoader,new Class<?>[]{classType},rpcDynamicProxy);
return object;
}

@Override
public Class<?> getObjectType() {
return this.classType;
}

@Override
public boolean isSingleton() {
return false;
}

提供注册类,扫描相关RpcClient 注解的类,将bean注入spring 容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class RpcClientsRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
registerRpcClients(importingClassMetadata,registry);
}

/**
* 根据注解 注册 rpcClients
* @param metadata
* @param registry
*/

public void registerRpcClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry)
{

ClassPathScanningCandidateComponentProvider provider = getScanner();
//设置扫描器
provider.addIncludeFilter(new AnnotationTypeFilter(RpcClient.class));
//扫描此包下的所有带有@RpcClient的注解的类
Set<BeanDefinition> beanDefinitionSet = provider.findCandidateComponents("com.magicdu.rpc.rpcclient");
for (BeanDefinition beanDefinition : beanDefinitionSet){
if (beanDefinition instanceof AnnotatedBeanDefinition){
//获得注解上的参数信息
AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
String beanClassAllName = beanDefinition.getBeanClassName();
Map<String, Object> paraMap = annotatedBeanDefinition.getMetadata()
.getAnnotationAttributes(RpcClient.class.getCanonicalName());
//将RpcClient的工厂类注册进去
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(RpcClientFactoryBean.class);
//设置RpcClinetFactoryBean工厂类中的构造函数的值
builder.addConstructorArgValue(beanClassAllName);
builder.getBeanDefinition().setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
//将其注册进容器中
registry.registerBeanDefinition(
beanClassAllName,
builder.getBeanDefinition());
}
}
}
//允许Spring扫描接口上的注解
protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false) {
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
};
}
}

然后将 注册类,引入 spring 配置,以便启动时扫描

1
2
3
4
@Configuration
@Import(RpcClientsRegistrar.class)
public class RpcConfig {
}

这样客户端我们就写完了。

服务端代码

服务端简单来说,就是开个端口,接受客户端发送的参数,找到对应的方法,调用完,返回数据就完了。

我们将所有的客户端调用的方法都用 @Service 注解标记,服务端启动时我们将所有Service注解的方法缓存起来,客户端调用时,通过反射,我们便能执行服务端的方法了。并且扫描完就启动相关Socket,我们提供一个 Socket ,等待客户端调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
@Slf4j
public class RpcServiceConfig implements CommandLineRunner {
@Autowired
private ApplicationContext applicationContext;
public static Map<String,Object> rpcServiceMap = new HashMap<>();

@Override
public void run(String... args) throws Exception {
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Service.class);
for (Object bean: beansWithAnnotation.values()){
Class<?> clazz = bean.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> inter : interfaces){
rpcServiceMap.put(getClassName(inter.getName()),bean);
log.info("已经加载的服务:"+inter.getName());
}
}
startPort();
}
private String getClassName(String beanClassName){
String className = beanClassName.substring(beanClassName.lastIndexOf(".")+1);
className = className.substring(0,1).toLowerCase() + className.substring(1);
return className;
}

public void startPort() throws IOException {
//服务端在20006端口监听客户端请求的TCP连接
ServerSocket server = new ServerSocket(20006);
Socket client = null;
boolean f = true;
while (f) {
//等待客户端的连接,如果没有获取连接
client = server.accept();
System.out.println("与客户端连接成功!");
//为每个客户端连接开启一个线程
new Thread(new ServerThread(client)).start();
}
server.close();
}
}

提供线程,供客户端调用,如果收到客户端请求,就去执行对应的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class ServerThread implements Runnable {

private Socket client = null;

public ServerThread(Socket client) {
this.client = client;
}

@Override
public void run() {
try {
//获取Socket的输出流,用来向客户端发送数据
PrintStream out = new PrintStream(client.getOutputStream());
//获取Socket的输入流,用来接收从客户端发送过来的数据
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
boolean flag = true;
while (flag) {
//接收从客户端发送过来的数据
String str = buf.readLine();
if (str == null || "".equals(str)) {
flag = false;
} else {
out.println(CommonDeal.getInvokeMethodMes(str));
}
}
out.close();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}


}

执行方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class CommonDeal {

public static String getInvokeMethodMes(String str){
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
Gson gson = gsonBuilder.create();
RpcRequest request = gson.fromJson(str, RpcRequest.class);
return gson.toJson(invokeMethod(request));
}

private static RpcResponse invokeMethod(RpcRequest request) {
String className = request.getClassName();
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
Class<?>[] parameTypes = request.getParameTypes();
Object o = RpcServiceConfig.rpcServiceMap.get(className);
RpcResponse response = new RpcResponse();
try {
Method method = o.getClass().getDeclaredMethod(methodName, parameTypes);
Object invokeMethod = method.invoke(o, parameters);
response.setResult(invokeMethod);
} catch (NoSuchMethodException e) {
log.info("没有找到" + methodName);
} catch (IllegalAccessException e) {
log.info("执行错误" + parameters);
} catch (InvocationTargetException e) {
log.info("执行错误" + parameters);
}
return response;
}
}

但是其中还有很多的功能需要完善,例如一个完整RPC框架肯定还需要服务注册与发现,而且双方通信肯定也不能是直接开启一个线程一直在等着,肯定需要是异步的等等的各种功能。这个例子只是简单帮助我们了解相关Rpc的思路以及简单的实现。