Netty实现简易版的RPC框架过程详解

作者:我是小趴菜 时间:2023-05-23 23:19:58 

项目地址:gitee.com/baojh123/rp…

Netty实现简易版的RPC框架过程详解

netty-study 这个项目是没用到的,可以删掉,主要是测试Netty自定义协议的

1:如何运行项目

1:本地起一个zookeeper服务

2: 只需要运行 rpc-serverspringboot-zk-study二个项目即可

3: 二个项目的application.yml 都不需要改,唯一要改的就是zookeepr的连接配置信息

Netty实现简易版的RPC框架过程详解

4:启动好之后,在浏览器访问

http://localhost:8081/zk/test

http://localhost:8081/zk/people

http://localhost:8081/zk/list

可以查看到返回结果

Netty实现简易版的RPC框架过程详解

2:从客户端调用开始(springboot-zk-study项目)

@RestController
@RequestMapping("/zk")
public class ZkController {
           @Resource
           @MyResource
           private UserService userService;
           @Resource
           @MyResource
           private PeopleService peopleService;
           @GetMapping("/test")
           public String test() {
               return userService.test("bjh-",1);
           }
           @GetMapping("/people")
           public Object people() {
               return peopleService.query(1L);
           }
           @GetMapping("/list")
           public Object list() {
               return peopleService.list();
           }
}

只需要在我们需要进行RPC调用的接口上添加 @MyResource 注解即可,当我们执行这个方法之后,就会执行代理方法,代理方法在 rpc-core 项目中,为了阅读清晰,我只贴出重点的方法

@Slf4j
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner {
    ......省略一些代码
    // 客户端执行方法之后,就会执行到这里的代理方法
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //从注册中心拿到服务列表
        ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class);
        List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList();
        for(ZkProperties zkProperties : zkPropertiesList) {
            String interfaceName = zkProperties.getInterfaceName();
            Class<?> declaringClass = method.getDeclaringClass();
            if(StringUtils.equals(declaringClass.getName(),interfaceName)) {
                List<InterfaceInfo> info = zkProperties.getInfo();
                InterfaceInfo interfaceInfo = info.get(0);
                String ipAddress = interfaceInfo.getIpAddress();
                List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo();
                InterfaceImplInfo implInfo = interfaceImplInfo.get(0);
                String[] strings = ipAddress.split(":");
                //与远程Netty服务端发起连接
                RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort());
                /**
                 * 封装请求参数
                 */
                //获取方法参数类型
                Class<?>[] parameterTypes = method.getParameterTypes();
                List<String> types = getTypes(parameterTypes);
                //同步调用
                result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName);
                log.info("返回结果是:{}",result);
            }
        }
        Class<?> returnType = method.getReturnType();
        Object value = objectMapper.readValue(result.toString(), returnType);
        return value;
    }
    private RpcClient connNettyServer(String ipAddress,Integer port) {
        return new RpcClient(ipAddress,port);
    }
    private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{
        RpcMessage rpcMessage = new RpcMessage();
         ......
        //发送请求
        Response result = rpcClient.sendRequest(rpcMessage);
        log.info("请求结果是:{}", JSONUtil.toJsonPrettyStr(result));
        return result.getData();
    }
    ......省略一些代码

我们初始化客户端连接和发送请求都在一个RpcClient的类中,我们看下这个类的代码

@Slf4j
public class RpcClient {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap;
private String ip;
private Integer port;
RpcClientHandler rpcClientHandler;
private ChannelFuture channelFuture;
public RpcClient(String ip,Integer port) {
    bootstrap = new Bootstrap();
    bootstrap.group(group)
            .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //加入处理器
                    rpcClientHandler = new RpcClientHandler();
                    ch.pipeline().addLast(new RpcDecoder());
                    ch.pipeline().addLast(new RpcEncoder());
                    ch.pipeline().addLast(rpcClientHandler);
                }
            });
    try {
        // 和远程Nett服务端建立连接
        channelFuture = bootstrap.connect(ip, port).sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
public Response sendRequest(RpcMessage rpcMessage) throws Exception{
    //发送请求
    channelFuture.channel().writeAndFlush(rpcMessage).sync();
    channelFuture.channel().closeFuture().sync();
    log.info("获取返回结果=====================");
    Response response = rpcClientHandler.getResponse();
    return response;
}
}

客户端在这发送请求到服务端之后,就接收服务端返回回来的消息即可,然后将返回结果返回给我们的接口。客户端的调用就到这里了,现在看下服务端的

3:服务端处理请求

服务端处理请求的核心都在 rpc-coreRpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
   ObjectMapper objectMapper = new ObjectMapper();
   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
       Object obj = rpcMessage.getObj();
       RpcMessage rpcMessageResponse = new RpcMessage();
       Response response = new Response();
       try{
           Request request = objectMapper.readValue(obj.toString(), Request.class);
           String interfaceImplName = request.getInterfaceImplName();
           Class<?> aClass = Class.forName(interfaceImplName);
           List<String> paramsTypes = request.getParamsTypes();
           try {
               Object result = null;
               //判读方法是有参数的还是没有参数的
               if(paramsTypes.isEmpty()) {
                   Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName());
                   result = declaredMethod.invoke(aClass.newInstance());
               }else {
                   Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray());
                   Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes");
                   Object[] args = (Object[]) paramsObjectMap.get("args");
                   result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args);
               }
               log.info("返回结果是:{}",result);
               response.setData(objectMapper.writeValueAsString(result));
               response.setIsOk(1);
               response.setErrInfo("error");
               rpcMessageResponse.setObj(response);
           } catch (Throwable throwable) {
               throwable.printStackTrace();
               response.setData("error");
               response.setIsOk(0);
               response.setErrInfo(throwable.getMessage());
               rpcMessageResponse.setObj(response);
           }
       }catch (Exception e) {
           response.setData("error");
           response.setIsOk(0);
           response.setErrInfo(e.getMessage());
           rpcMessageResponse.setObj(response);
       }
       String valueAsString = objectMapper.writeValueAsString(response);
       rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length);
       rpcMessageResponse.setObj(valueAsString);
       channelHandlerContext.writeAndFlush(rpcMessageResponse);
   }
}

服务端就拿到客户端传过来的接口名称,从zookeeper获取到具体的实现类,然后通过反射调用即可

4:接下来要做什么

上面只是简单的介绍了下整个调用的大概过程,还有很多问题没有解释清楚,比如

1:在客户端我们要使用UserService,但是你会发现我们使用了二个注解,一个是我们自定义的,一个是spring注入用的,但是在项目中我们并没有这个接口的实现类,spring是怎么将这个接口注入到自己容器中的呢

2: 为什么调用使用了 @MyResource的接口方法都会走代理方法,是怎么做到的

@Resource
@MyResource
private PeopleService peopleService;

3:我们的服务是怎么在服务启动的时候注册到zookeeper的,注册的信息又是什么,可以看下我们服务注册到zookeeper的信息如下

{
"zkPropertiesList": [{
"interfaceName": "com.bjh.service.PeopleService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.PeopleServiceImpl",
"value": "com.bjh.service.PeopleServiceImpl"
}]
}]
}, {
"interfaceName": "com.bjh.service.UserService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.UserServiceImpl",
"value": "com.bjh.service.UserServiceImpl"
}]
}]
}]
}

4:在我们的服务端的实现类,我们只使用了我们自定义的 @Service注解,这个注解不是Spring的

@Service
  public class PeopleServiceImpl implements PeopleService{
       @Override
       public People query(long id) {
           People people = new People();
           people.setId(id);
           people.setName("coco");
           return people;
       }
       @Override
       public List&lt;People&gt; list() {
           List&lt;People&gt; list = new ArrayList&lt;&gt;();
           People people = new People();
           people.setId(123L);
           people.setName("coco");
           People people2 = new People();
           people2.setId(124L);
           people2.setName("baojh");
           list.add(people);
           list.add(people2);
           return list;
       }
}

5:还有客户端请求的结构体是怎么样的,还有返回响应结果是怎么样的等等,后续我会继续更新

来源:https://juejin.cn/post/7198041700563877945

标签:Netty,简易版,RPC,框架
0
投稿

猜你喜欢

  • Kotlin扩展方法超详细介绍

    2023-05-31 16:31:23
  • android开发教程之view组件添加边框示例

    2023-05-24 17:16:44
  • Java中的接口回调实例

    2023-11-29 08:05:43
  • spring boot executable jar/war 原理解析

    2022-10-13 18:30:09
  • C#判断多个文本框是否为空的方法

    2022-05-12 23:05:32
  • springmvc+shiro自定义过滤器的实现代码

    2021-08-11 21:23:11
  • Spring boot集成Kafka消息中间件代码实例

    2022-11-06 21:53:48
  • 高吞吐、线程安全的LRU缓存详解

    2021-10-01 01:40:28
  • C#程序(含多个Dll)合并成一个Exe的简单方法

    2023-04-09 15:55:08
  • JPA多数据源分布式事务处理方案

    2023-08-09 03:50:06
  • Android7.0 MTK设置默认桌面

    2023-09-26 12:30:43
  • Guava - 并行编程Futures详解

    2022-04-28 23:16:34
  • Java使用Freemarker页面静态化生成的实现

    2022-07-24 08:48:42
  • Java 判断实体对象及所有属性是否为空的操作

    2022-12-06 14:32:07
  • Java数据结构之二叉排序树的实现

    2023-07-05 02:27:25
  • Android自定义viewgroup可滚动布局 GestureDetector手势监听(5)

    2023-06-17 23:15:48
  • HTTP中get和post的区别详解

    2023-04-19 11:42:18
  • jdk8使用stream实现两个list集合合并成一个(对象属性的合并)

    2023-08-05 16:49:30
  • Android仿新浪微博发送菜单界面的实现

    2022-12-13 10:54:03
  • SpringBoot实现接口等幂次校验的示例代码

    2022-01-21 10:49:00
  • asp之家 软件编程 m.aspxhome.com