10个类手写实现 RPC 通信框架原理架构真经关注共 9299字,需浏览 19分钟 ·2020-08-27 03:48 链接:autumn200.com/2020/06/21/write-rpc/ 正文 什么是rpcRPC:remote procedure call Protocol 远程过程调用 调用远程服务,就像调用本地的服务一样,不用关心调用细节,就像调用本机的服务一样的RPC原理实现RPC通信的程序包括5个部分:rpc-client、客户端proxy、socket、服务端proxy、rpc-serverrequest客户端:当rpc-client发起远程调用时,实际上是通过客户端代理 将要调用的接口、方法、参数、参数类型进行序列化,然后通过socket实时将封装调用参数的实例发送到服务端。服务端:socket接收到客户端传来的信息进行反序列化,然后通过这些信息委派到具体的实现对象response服务端:目标方法执行完成后,将执行结果返回给socket客户端:socket接收到结果后,返回给rpc-client,调用结束应用到的技术javaspring序列化socket反射动态代理项目GitHub地址https://github.com/autumnqfeng/write_rpc服务端项目项目结构rpc-server项目包含2个子项目:order-api、order-providerorder-api中存放请求接口与RpcRequest(类名、方法名、参数的实体类)order-provider为请求接口实现、socket、proxy相关类order-apiorder-provider服务注册要想实现动态调用ServiceImpl,关键就需要将service类管理起来,那问题来了,我们如何管理这些服务类呢?我们可以参照spring中的@Service注解,通过自定义注解的方式来做到服务注册,我们定义一个注解@RpcRemoteService作用在ServiceImpl类上,将标记此注解的类名、方法名保存到Map中,以此来定位到具体实现类。@RpcRemoteService注解/** * 服务端服务发现注解 * * @author: *** * @date: 2020/6/21 16:21 */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface RpcRemoteService {}服务注册类InitialMerdiator在spring容器初始化完成之后,扫描到@RpcRemoteService标记的类,并保存到Mediator.ROUTING中。/** * 初始化中间代理层对象 * * @author: *** * @date: 2020/6/21 16:33 */@Componentpublic class InitialMerdiator implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //加了服务发布标记的bean进行远程发布 if (bean.getClass().isAnnotationPresent(RpcRemoteService.class)) { Method[] methods = bean.getClass().getDeclaredMethods(); for (Method method : methods) { String routingKey = bean.getClass().getInterfaces()[0].getName() + "." + method.getName(); BeanMethod beanMethod = new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.ROUTING.put(routingKey, beanMethod); } } return bean; }}socket监听socket监听客户端请求socket启动类SocketServerspring容器加载完成之后,启动socket/** * spring 容器启动完成之后,会发布一个ContextRefreshedEven * 容器启动后启动socket监听 * * @author: *** * @date: 2020/6/21 16:51 */@Componentpublic class SocketServer implements ApplicationListener<ContextRefreshedEvent> { private final ExecutorService executorService= Executors.newCachedThreadPool(); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { ServerSocket serverSocket=null; try { serverSocket = new ServerSocket(8888); while (true) { Socket accept = serverSocket.accept(); // 线程池处理socket executorService.execute(new ProcessorHandler(accept)); } } catch (IOException e) { e.printStackTrace(); } finally { if (serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }socket处理类ProcessorHandler处理监听到的每个socketpublic class ProcessorHandler implements Runnable { private Socket socket; public ProcessorHandler(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try { inputStream = new ObjectInputStream(socket.getInputStream()); // 反序列化 RpcRequest rpcRequest = (RpcRequest) inputStream.readObject(); // 中间代理执行目标方法 Mediator mediator = Mediator.getInstance(); Object response = mediator.processor(rpcRequest); System.out.println("服务端的执行结果:"+response); outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(response); outputStream.flush(); } catch (Exception e) { e.printStackTrace(); } finally { closeStream(inputStream, outputStream); } } private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) { // 关闭流 if(inputStream!=null){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream!=null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }服务端代理Mediator/** * 服务端socket与目标方法的中间代理层 * * @author: *** * @date: 2020/6/21 16:25 */public class Mediator { /** 用来存储发布的服务的实例(服务调用的路由) */ public static Map ROUTING = new ConcurrentHashMap<>(); /** 单例模式创建该代理层实例 */ private volatile static Mediator instance; private Mediator() { } public static Mediator getInstance() { if (instance == null) { synchronized (Mediator.class) { if (instance == null) { instance = new Mediator(); } } } return instance; } public Object processor(RpcRequest rpcRequest) { // 路由key String routingKey = rpcRequest.getClassName() + "." + rpcRequest.getMethodName(); BeanMethod beanMethod = ROUTING.get(routingKey); if (beanMethod == null) { return null; } // 执行目标方法 Object bean = beanMethod.getBean(); Method method = beanMethod.getMethod(); try { return method.invoke(bean, rpcRequest.getArgs()); } catch (Exception e) { e.printStackTrace(); } return null; }}BeanMethod/** * 中间层反射调用时,存储目标方法、目标类的实体 * * @author: *** * @date: 2020/6/21 16:43 */public class BeanMethod { private Object bean; private Method method; // setter、getter略}客户端项目项目结构服务发现服务发现我们同样使用注解来做,这就需要参照Spring中@Autowired的原理实现,我们自定义@RpcReference注解,定义在字段上,将接口实现的代理类注入到该字段上。@RpcReference注解/** * 服务注入注解 * * @author: *** * @date: 2020/6/20 22:41 */@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface RpcReference {}服务发现类ReferenceInvokeProxy在spring容器初始化之前,扫描bean中所有@RpcReference注解标记的字段。/** * 远程动态调用service代理 * * @author: *** * @date: 2020/6/20 22:44 */@Componentpublic class ReferenceInvokeProxy implements BeanPostProcessor { @Autowired private RemoteInvocationHandler invocationHandler; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { if (field.isAnnotationPresent(RpcReference.class)) { field.setAccessible(true); // 针对这个加了RpcReference注解的字段,设置为一个代理的值 Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, invocationHandler); try { // 相当于针对加了RpcReference的注解,设置了一个代理,这个代理的实现是invocationHandler field.set(bean, proxy); } catch (IllegalAccessException e) { e.printStackTrace(); } } } return bean; }}客户端代理客户端动态代理InvocationHandler实现类RemoteInvocationHandler将目标方法名、目标类名、参数信息封装到RpcRequest,然后交给socket发送到服务端。/** * @author: *** * @date: 2020/6/20 22:51 */@Componentpublic class RemoteInvocationHandler implements InvocationHandler { @Autowired private RpcNetTransport rpcNetTransport; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setTypes(method.getParameterTypes()); rpcRequest.setArgs(args); return rpcNetTransport.send(rpcRequest); }}客户端socket网络传输RpcNetTransport/** * rpc socket 网络传输 * * @author: *** * @date: 2020/6/20 22:59 */@Componentpublic class RpcNetTransport { @Value("${rpc.host}") private String host; @Value("${rpc.port}") private int port; public Object send(RpcRequest rpcRequest) { ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { Socket socket = new Socket(host, port); // 发送目标方法信息 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(rpcRequest); outputStream.flush(); // 接收返回值 inputStream = new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } finally { closeStream(inputStream, outputStream); } return null; } private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) { // 关闭流 if(inputStream!=null){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream!=null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }} 浏览 18点赞 评论 收藏 分享 手机扫一扫分享分享 举报 评论图片表情视频评价全部评论推荐 10 个类手写实现 RPC 通信框架JAVA小咖秀0内容回顾 | 手写RPC框架云中志0六月份内容回顾 | 手写RPC框架云中志0一文了解RPC框架原理开源Linux0RPC 框架,底层到底是什么原理?码农沉思录0OpenRPCC语言实现的RPC框架openrpc is an open rpc framework,support sync and 通信原理 : 通信原理 《普通高等教育"十一五"国家级规划教材•电子信息学科基础课程系列教材:通信原理》主要讨论通信系统的基微前端框架实现原理前端大神之路0通信原理 : 通信原理 通信原理 : 通信原理 0OpenRPCC语言实现的RPC框架openrpcisanopenrpcframework,supportsyncandasynccall.theeventloopbasedonlibev.openrpccanuseforcommeri点赞 评论 收藏 分享 手机扫一扫分享分享 举报