10 个类手写实现 RPC 通信框架
链接:autumn200.com
什么是rpc
RPC:remote procedure call Protocol 远程过程调用 调用远程服务,就像调用本地的服务一样,不用关心调用细节,就像调用本机的服务一样的
RPC原理
实现RPC通信的程序包括5个部分:rpc-client、客户端proxy、socket、服务端proxy、rpc-server
request
客户端:当rpc-client发起远程调用时,实际上是通过客户端代理 将要调用的接口、方法、参数、参数类型进行序列化,然后通过socket实时将封装调用参数的实例发送到服务端。 服务端:socket接收到客户端传来的信息进行反序列化,然后通过这些信息委派到具体的实现对象
response
服务端:目标方法执行完成后,将执行结果返回给socket 客户端:socket接收到结果后,返回给rpc-client,调用结束
应用到的技术
java spring 序列化 socket 反射 动态代理
项目GitHub地址
掘金地址
服务端项目
项目结构
RpcRequest
(类名、方法名、参数的实体类)order-api
order-provider
服务注册
ServiceImpl
,关键就需要将service类管理起来,那问题来了,我们如何管理这些服务类呢?@Service
注解,通过自定义注解的方式来做到服务注册,我们定义一个注解@RpcRemoteService
作用在ServiceImpl
类上,将标记此注解的类名、方法名保存到Map中,以此来定位到具体实现类。@RpcRemoteService
注解
/**
* 服务端服务发现注解
*
* @author: ***
* @date: 2020/6/21 16:21
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcRemoteService {
}
服务注册类InitialMerdiator
@RpcRemoteService
标记的类,并保存到Mediator.ROUTING
中。/**
* 初始化中间代理层对象
*
* @author: ***
* @date: 2020/6/21 16:33
*/
@Component
public class InitialMerdiator implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//加了服务发布标记的bean进行远程发布
if (bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
String routingKey = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.ROUTING.put(routingKey, beanMethod);
}
}
return bean;
}
}
socket启动类SocketServer
/**
* spring 容器启动完成之后,会发布一个ContextRefreshedEven
* 容器启动后启动socket监听
*
* @author: ***
* @date: 2020/6/21 16:51
*/
@Component
public class SocketServer implements ApplicationListener<ContextRefreshedEvent> {
private final ExecutorService executorService= Executors.newCachedThreadPool();
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ServerSocket serverSocket=null;
try {
serverSocket = new ServerSocket(8888);
while (true) {
Socket accept = serverSocket.accept();
// 线程池处理socket
executorService.execute(new ProcessorHandler(accept));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
ProcessorHandler
public class ProcessorHandler implements Runnable {
private Socket socket;
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
// 反序列化
RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
// 中间代理执行目标方法
Mediator mediator = Mediator.getInstance();
Object response = mediator.processor(rpcRequest);
System.out.println("服务端的执行结果:"+response);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(response);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeStream(inputStream, outputStream);
}
}
private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
// 关闭流
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Mediator
/**
* 服务端socket与目标方法的中间代理层
*
* @author: ***
* @date: 2020/6/21 16:25
*/
public class Mediator {
/** 用来存储发布的服务的实例(服务调用的路由) */
public static Map ROUTING = new ConcurrentHashMap<>();
/** 单例模式创建该代理层实例 */
private volatile static Mediator instance;
private Mediator() {
}
public static Mediator getInstance() {
if (instance == null) {
synchronized (Mediator.class) {
if (instance == null) {
instance = new Mediator();
}
}
}
return instance;
}
public Object processor(RpcRequest rpcRequest) {
// 路由key
String routingKey = rpcRequest.getClassName() + "." + rpcRequest.getMethodName();
BeanMethod beanMethod = ROUTING.get(routingKey);
if (beanMethod == null) {
return null;
}
// 执行目标方法
Object bean = beanMethod.getBean();
Method method = beanMethod.getMethod();
try {
return method.invoke(bean, rpcRequest.getArgs());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
BeanMethod
/**
* 中间层反射调用时,存储目标方法、目标类的实体
*
* @author: ***
* @date: 2020/6/21 16:43
*/
public class BeanMethod {
private Object bean;
private Method method;
// setter、getter略
}
客户端项目
项目结构
服务发现
@Autowired
的原理实现,我们自定义@RpcReference
注解,定义在字段上,将接口实现的代理类注入到该字段上。@RpcReference
注解
/**
* 服务注入注解
*
* @author: ***
* @date: 2020/6/20 22:41
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {
服务发现类ReferenceInvokeProxy
@RpcReference
注解标记的字段。/**
* 远程动态调用service代理
*
* @author: ***
* @date: 2020/6/20 22:44
*/
@Component
public class ReferenceInvokeProxy implements BeanPostProcessor {
@Autowired
private RemoteInvocationHandler invocationHandler;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(RpcReference.class)) {
field.setAccessible(true);
// 针对这个加了RpcReference注解的字段,设置为一个代理的值
Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class>[]{field.getType()}, invocationHandler);
try {
// 相当于针对加了RpcReference的注解,设置了一个代理,这个代理的实现是invocationHandler
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
客户端动态代理InvocationHandler
实现类RemoteInvocationHandler
RpcRequest
,然后交给socket发送到服务端。/**
* @author: ***
* @date: 2020/6/20 22:51
*/
@Component
public class RemoteInvocationHandler implements InvocationHandler {
@Autowired
private RpcNetTransport rpcNetTransport;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
return rpcNetTransport.send(rpcRequest);
}
}
网络传输RpcNetTransport
/**
* rpc socket 网络传输
*
* @author: ***
* @date: 2020/6/20 22:59
*/
@Component
public class RpcNetTransport {
@Value("${rpc.host}")
private String host;
@Value("${rpc.port}")
private int port;
public Object send(RpcRequest rpcRequest) {
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
Socket socket = new Socket(host, port);
// 发送目标方法信息
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
// 接收返回值
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
closeStream(inputStream, outputStream);
}
return null;
}
private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
// 关闭流
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
如果看到这里,说明你喜欢这篇文章,请 转发、点赞。同时 标星(置顶)本公众号可以第一时间接受到博文推送。
博主微信:baiseyumaoxx,之前博主分享了很多资源,有的已经删除了(你懂得),如果有的你当时没有领到还想领得就可以加我微信,我在发给你,你需要得资源也可以给我说,我尽力给你找~
明天见(。・ω・。)ノ♡
喜欢文章,点个在看
评论