Kong

当机立断,非黑即白,不要后悔

Enjoy programming and build awesome stuff.
Silence makes big money.
Life's most persistent and urgent question is, "What are you doing for others?"


手写rpc框架

手写RPC框架

一、内容概览

  • RPC的概念及运作流程

  • RPC协议及RPC框架的概念

  • Netty的基本使用

  • Java序列化及反序列化

  • Zookeeper注册中心的基本使用

  • 自定义注解实现特殊业务逻辑

  • Java的动态代理

  • 自定义Spring Boot Starter

二、RPC基础知识

2.1 RPC

Remote Procedure Call(RPC):远程过程调用。

借助网络通信实现想通用本地方法一样调用远程方法。

2.2 RPC的流程

1、客户端处理过程中调用Client stub(就像调用本地方法一样),传递参数; 2、Client stub将参数编组为消息,然后通过系统调用向服务端发送消息; 3、客户端本地操作系统将消息从客户端机器发送到服务端机器; 4、服务端操作系统将接收到的数据包传递给Server stub; 5、Server stub解组消息为参数; 6、Server stub再调用服务端的过程,过程执行结果以反方向的相同步骤响应给客户端。

请添加图片描述

2.3 RPC要解决两个问题

1、分布式系统中服务间调用问题。

2、远程调用是,要能够像本地方法一样方便,让调用者感知不到远程调用的逻辑。

2.4 RPC协议是什么?

RPC调用过程中需要将参数编组为消息进行发送,接受方需要解组消息为参数,过程处理结果同样需要经编组、解组。消息由哪些部分构成及消息的表示形式就构成了消息协议。

RPC调用过程中采用的消息协议称为RPC协议

  • RPC协议规定请求、响应消息的格式

  • 在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互

我们可以选用通用的标准协议(如:http、https),也也可根据自身的需要定义自己的消息协议。

2.5 RPC框架:

传统的webservice框架:Apache CXF、Apache Axis2、Java自带的JAX-WS等。webservice框架大多基于标准的SOAP协议。 新兴的微服务框架:Dubbo、spring cloud、Apache Thrift等。

三、手写RPC

3.1 目标

本文会实现一个简单的RPC框架,zookeeper作为注册中心,具备服务注册与暴露,服务发现能力,项目名:kong-rpc-spring-boot-starter,

3.2 项目整体结构

请添加图片描述

client:客户端实现服务发现,服务代理调用远程服务

server:服务端要提供远程服务,具备服务注册及暴露的能力;在这之后,还需要开启网络服务,供客户端连接。

3.3 客户端实现

1、服务发现者

ServiceDIscoverer .java

/**
 * 服务发现接口
 * @author k
 */
public interface ServiceDiscoverer {
    List<Service> getServices(String name);
}

ZookeeperServiceDiscoverer.java 服务发现者

/**
 * @author k
 * @since 1.0.0
 */
public class ZookeeperServiceDiscoverer implements ServiceDiscoverer {
    private ZkClient zkClient;

    public ZookeeperServiceDiscoverer(String zkAddress){
        zkClient = new ZkClient(zkAddress);
        zkClient.setZkSerializer(new ZookeeperSerializer());
    }
    @Override
    public List<Service> getServices(String name) {
        String servicePath = RpcConstant.ZK_SERVICE_PATH+RpcConstant.PATH_DELIMITER+name+"/service";
        List<String> childrenList = zkClient.getChildren(servicePath);
        return Optional.ofNullable(childrenList).orElse(new ArrayList<>()).stream().map(str -> {
            String deCh = null;
            try{
                deCh = URLDecoder.decode(str,RpcConstant.UTF_8);
            }catch (UnsupportedEncodingException e){
                e.printStackTrace();
            }
            return JSON.parseObject(deCh,Service.class);
        }).collect(Collectors.toList());
    }
}

2、网络客户端

NetClient.java

/**
 * 网络请求客户端,定义网络请求规范
 * @author k
 * @since 1.0.0
 */
public interface NetClient {
    byte[] sendRequest(byte[] data, Service service) throws InterruptedException;
}

NettyNetClient.java

/**
 * 定义Netty网络请求的细则
 * @author k
 * @since 1.0.0
 */
public class NettyNetClient implements NetClient {
    private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);

    /**
     * 发送请求
     *
     * @param data 请求主数据
     * @param service 服务信息
     * @return 响应数据
     * @throws InterruptedException 抛出异常
     */
    @Override
    public byte[] sendRequest(byte[] data, Service service) throws InterruptedException {
        String[] addInfoArray = service.getAddress().split(":");
        String serverAddress = addInfoArray[0];
        String serverPort = addInfoArray[1];

        SendHandler sendHandler = new SendHandler(data);
        byte[] respData;
        //配置客户端
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel){
                            ChannelPipeline p = channel.pipeline();
                            p.addLast(sendHandler);
                        }
                    });
            //启动客户端连接
            b.connect(serverAddress,Integer.parseInt(serverPort)).sync();
            respData=(byte[]) sendHandler.rspData();
            logger.info("SendRequest get reply: {}",respData);
        }finally {
            //释放线程组资源
            group.shutdownGracefully();
        }
        return respData;
    }
}

SendHanler.java

package com.kong.rpc.client.net.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

/**
 * 发现处理类,定义Netty入站处理规则
 * @author k
 */
public class SendHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(SendHandler.class);

    private CountDownLatch cdl;
    private Object readMsg = null;
    private byte[] data;

    public SendHandler(byte[] data){
        cdl = new CountDownLatch(1);
        this.data = data;
    }

    /**
     * 当连接服务端成功后,发送请求数据
     * @param ctx 通道上下文
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        logger.info("Successful connection to server : {}",ctx);
        ByteBuf reqBuf = Unpooled.buffer(data.length);
        reqBuf.writeBytes(data);
        logger.info("Client sends message: {}",reqBuf);
        ctx.writeAndFlush(reqBuf);
    }

    /**
     * 读取数据,读取完毕后释放CD锁
     * @param ctx 通道上下文
     * @param msg ByteBuf
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        logger.info("Client reads message: {}",msg);
        ByteBuf msgBuf = (ByteBuf) msg;
        byte[] resp = new byte[msgBuf.readableBytes()];
        msgBuf.readBytes(resp);
        readMsg = resp;
        cdl.countDown();
    }

    /**
     * 等待读取完成
     * @return
     * @throws InterruptedException
     */
    public Object rspData() throws InterruptedException {
        cdl.await();
        return readMsg;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        cause.printStackTrace();
        logger.error("Exception occurred : {}",cause.getMessage());
        ctx.close();
    }
}

3、服务代理

服务代理来执行服务调用

ClientProxyFactory.java

package com.kong.rpc.client;

import com.kong.rpc.client.discovery.ServiceDiscoverer;
import com.kong.rpc.client.net.NetClient;
import com.kong.rpc.common.protocol.MessageProtocol;
import com.kong.rpc.common.protocol.RpcRequest;
import com.kong.rpc.common.protocol.RpcResponse;
import com.kong.rpc.common.service.Service;
import com.kong.rpc.execption.RpcException;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import static java.lang.reflect.Proxy.newProxyInstance;

public class ClientProxyFactory {
    private ServiceDiscoverer serviceDiscoverer;

    private Map<String, MessageProtocol> supportMessageProtocols;

    private NetClient netClient;

    private Map<Class<?>,Object> objectCache = new HashMap<>();

    /**
     * 通过java动态代理获取服务代理类
     * @param clazz 被代理Class
     * @param <T> 泛型
     * @return
     */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        return (T)this.objectCache.computeIfAbsent(clazz,
                cls -> newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},new ClientInvocationHandler(cls))
        );
    }
    public ServiceDiscoverer getServiceDiscoverer(){
        return serviceDiscoverer;
    }
    public void setSid(ServiceDiscoverer serviceDiscoverer) {
        this.serviceDiscoverer = serviceDiscoverer;
    }
    public Map<String, MessageProtocol> getSupportMessageProtocols() {
        return supportMessageProtocols;
    }

    public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
        this.supportMessageProtocols = supportMessageProtocols;
    }

    public NetClient getNetClient() {
        return netClient;
    }

    public void setNetClient(NetClient netClient) {
        this.netClient = netClient;
    }
    //客户端服务代理类invoke函数细节实现
    private class ClientInvocationHandler implements InvocationHandler {
        private Class<?> clazz;

        private Random random = new Random();
        public ClientInvocationHandler(Class<?> clazz) {
            super();
            this.clazz = clazz;
        }
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if(method.getName().equals("toString")){
                return proxy.getClass().toString();
            }
            if(method.getName().equals("hashCode")) {
                return 0;
            }

            //1、获得服务信息
            String serviceName = this.clazz.getName();
            List<Service> services = serviceDiscoverer.getServices(serviceName);

            if(services == null || services.isEmpty()){
                throw new RpcException("No provider available!");
            }
            // 随机选择一个服务提供者(软负载)
            Service service = services.get(random.nextInt(services.size()));

            //2、构造request对象
            RpcRequest req = new RpcRequest();
            req.setServiceName(service.getName());
            req.setMethod(method.getName());
            req.setParameterTypes(method.getParameterTypes());
            req.setParameters(args);
            //3、协议层编组
            MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol());
            byte[] data = protocol.marshallingRequest(req);
//            4、调用网络层发送请求
            byte[] repData = netClient.sendRequest(data,service);
            //5、解组响应信息
            RpcResponse rsp = protocol.unmarshallingResponse(repData);

            //6、结果处理
            if (rsp.getException() != null){
                throw rsp.getException();
            }
            return rsp.getReturnValue();
        }
    }
}

4、消息协议

java 序列化消息协议

MessageProtocol

3.4 服务端实现

服务端提供远程服务

1、服务注册

ServiceRegister

将指定ServiceObject对象序列化后保存到ZooKeeper上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法。

public interface ServiceRegister {
    void register(ServiceObject so) throws Exception;
    ServiceObject getServiceObject(String name) throws Exception;
}
/**
 * Zookeeper 服务注册器
 * @author k
 * @since 1.0.0
 */
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {
    private ZkClient zkClient;
    public ZookeeperExportServiceRegister(String zkAddress,Integer port,String protocol){
        zkClient = new ZkClient(zkAddress);
        zkClient.setZkSerializer(new ZookeeperSerializer());
        this.port = port;
        this.protocol = protocol;
    }
    @Override
    public void register(ServiceObject so) throws Exception {
        super.register(so);
        Service service = new Service();
        String host = InetAddress.getLocalHost().getHostAddress();
        String address = host+":"+port;
        service.setAddress(address);
        service.setName(so.getClazz().getName());
        service.setProtocol(protocol);
        this.exportService(service);
    }
    private void exportService(Service serviceResource){
        String serviceName = serviceResource.getName();
        String uri = JSON.toJSONString(serviceResource);
        try{
            uri = URLEncoder.encode(uri,UTF_8);
        }catch (UnsupportedEncodingException e){
            e.printStackTrace();
        }
        String servicePath = ZK_SERVICE_PATH +PATH_DELIMITER+serviceName+"/service";
        if(!zkClient.exists(servicePath)){
            zkClient.createPersistent(servicePath,true);
        }
        String uriPath = servicePath+PATH_DELIMITER+uri;
        if(zkClient.exists(uriPath)){
            zkClient.delete(uriPath);
        }
        zkClient.createEphemeral(uriPath);
    }
}

2、网络服务

NettyRpcServer 主要提供Netty网络服务开启和关闭能力

/**
 * Netty RPC 服务端,提供Nett网络服务开启、关闭
 * @author k
 * @since 1.0.0
 */
public class NettyRpcServer extends RpcServer {
    private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
    private Channel channel;

    public NettyRpcServer(int port, String protocol, RequestHandler handler) {
        super(port, protocol, handler);
    }

    @Override
    public void start() {
        // 配置服务器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                public void initChannel(SocketChannel ch){
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new ChannelRequestHandler());
                    }
                    });
            // 启动服务
            ChannelFuture f = b.bind(port).sync();
            logger.info("Server started successfully.");
            channel = f.channel();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void stop() {
        this.channel.close();
    }
    private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            logger.info("Channel active: {}",ctx);
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
            logger.info("The server receives a message : {}",msg);
            ByteBuf msgBuf = (ByteBuf) msg;
            byte[] req = new byte[msgBuf.readableBytes()];
            msgBuf.readBytes(req);
            byte[] res = handler.handleRequest(req);
            logger.info("Send response: {}",msg);
            ByteBuf respBuf = Unpooled.buffer(res.length);
            respBuf.writeBytes(res);
            ctx.write(respBuf);
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            logger.error("Exception occurred:{}", cause.getMessage());
            ctx.close();
        }
    }
}

3、RPC处理

这里引入一个RPC处理者的概念,负责开启服务,以及注入服务。

/**
 * RPC处理者,支持服务启动暴露,自动注入Service
 *
 * @author k
 * @since 1.0.0
 */
public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    private ClientProxyFactory clientProxyFactory;

    @Resource
    private ServiceRegister serviceRegister;

    @Resource
    private RpcServer rpcServer;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())){
            ApplicationContext context = event.getApplicationContext();
            startServer(context);

            injectService(context);
        }
    }

    private void injectService(ApplicationContext context) {
        String[] names = context.getBeanDefinitionNames();
        for(String name : names){
            Class<?> clazz = context.getType(name);
            if (Objects.isNull(clazz)) continue;
            Field[] fields = clazz.getDeclaredFields();
            for (Field field : fields) {
                InjectService injectService = field.getAnnotation(InjectService.class);
                if (Objects.isNull(injectService)) continue;
                Class<?> fieldClass = field.getType();
                Object object = context.getBean(name);
                field.setAccessible(true);
                try{
                    field.set(object,clientProxyFactory.getProxy(fieldClass));
                }catch (IllegalAccessException e){
                    e.printStackTrace();
                }
            }
        }
    }

    private void startServer(ApplicationContext context) {
        Map<String,Object> beans = context.getBeansWithAnnotation(Service.class);
        if(beans.size() != 0) {
            boolean startServerFlag = true;
            for(Object obj : beans.values()) {
                try{
                    Class<?> clazz = obj.getClass();
                    Class<?>[] interfaces = clazz.getInterfaces();
                    ServiceObject so;
                    if (interfaces.length != 1) {
                        Service service = clazz.getAnnotation(Service.class);
                        String value = service.value();
                        if(value.equals("")){
                            startServerFlag = false;
                            throw new UnsupportedOperationException("The exposed interface is not specific with '"+obj.getClass().getName()+"'");
                        }
                        so = new ServiceObject(value,Class.forName(value),obj);
                    }else {
                        Class<?> supperClass = interfaces[0];
                        so = new ServiceObject(supperClass.getName(),supperClass,obj);
                    }
                    serviceRegister.register(so);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(startServerFlag) {
                rpcServer.start();
            }
        }
    }
}

3.5 注解

定义Service和InjectService两个注解实现服务注册和注入依赖

  • @Service :服务注册

  • @InjectService:注入服务

四、使用

4.1 生成本地Maven依赖

# 进入源码pom文件目录,执行命令
mvn clean install

4.2 服务提供者、消费者同时引入Maven依赖

<dependency>
    <groupId>com.kong.rpc</groupId>
    <artifactId>kong-rpc-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>

4.3 服务提供者、消费者同时配置注册中心

kong.rpc.register-address=192.168.2.191:2181

4.4 服务提供者注解服务示例(@Service)

import com.kong.rpc.annotation.Service;

@Service
public class UserServiceImpl implements UserService {    
    @Override
    public ApiResult<User> getUser(Long id) {        
        User user = getFromDbOrCache(id);        
        return ApiResult.success(user);   
    }    
    private User getFromDbOrCache(Long id) {        
        return new User(id, "user1", 1, "aaa");    
    }
}

4.5 服务消费者注入服务示例(@InjectService)

@RestController
@RequestMapping("/index/")
public class IndexController {    
    @InjectService
    private UserService userService;    
    /**
     * 获取用户信息
     *
     * @param id 用户id
     * @return 用户信息
     */
    @GetMapping("getUser")    
    public ApiResult<User> getUser(Long id) {        
    return userService.getUser(id);    
        }
}

使用 demo示例: https://github.com/KongF/kong-rpc-example.git

最近的文章

Elk日志分析系统搭建(基于docker容器)

ELK其实并不是一款软件,而是一整套解决方案,是三个软件产品的首字母缩写,Elasticsearch,Logstash 和 Kibana。这三款软件都是开源软件,通常是配合使用,而且又先后归于 Elastic.co 公司名下,故被简称为ELK协议栈。 Logstash分布于各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的Elasticsearch进行存储。Elasticsearch将数据以分片的形式压缩存储并提供多种API供用户查询,操作。用户亦可以更直观的通过配置K...…

继续阅读
更早的文章

Io与netty原理

​什么是IO?IO是Input、Output的简称,即输入输出。简单说就是读取数据,然后进行系统调用一、Java IO模型1、BIO(Blocking IO)BIO即同步阻塞模型,每个客户端连接对应一个处理线程,在BIO中,accept和read方法都是阻塞操作,没有连接请求时,accept方法阻塞等待,如果无数据读取时,read方法阻塞。​编辑2、NIO(Non Blocking IO)        NIO是同步非阻塞模型,与BIO相比引入了多路复用器Selector的概念,服务端一个...…

继续阅读