其他分享
首页 > 其他分享> > Netty网络框架学习笔记-20(实现一个简单RPC-2_2020.07.02)

Netty网络框架学习笔记-20(实现一个简单RPC-2_2020.07.02)

作者:互联网

服务提供者

1.1 定义接口以及其实现

1.1.1 MyRPCTest

public interface MyRPCTest {

    String hiHi(String p1);
}

1.1.2 MyRPCTestImpl

public class MyRPCTestImpl implements MyRPCTest {

    @Override
    public String hiHi(String p1) {
        return "成功进行了远程调用哟, 恭喜恭喜! 你的参数:"+p1+"==="+ UUID.fastUUID().toString();
    }
}

2.1 服务注册

2.1.2 ServiceRegistration

@Slf4j
public class ServiceRegistration {

    public static void ProjectStartCompletedTrigger(RegistrationFuture registrationFuture) {
        new Thread(() -> {
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringEncoder());
                            // 将ByteBuf转成为 json内容的ByteBuf
                            pipeline.addLast(new JsonObjectDecoder());
                            // 在转字符串
                            pipeline.addLast(new StringDecoder());
                            // 自定义业务处理器
                            pipeline.addLast(new MyProduceRegistrationHandler(registrationFuture));
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888));
            channelFuture.addListener(el -> {
                if (el.isSuccess()) {
                    log.error("ServiceRegistration===已经成功连接!!!");
                } else {
                    log.error("ServiceRegistration===连接失败!!!");
                }
            });
            try {
                channelFuture.sync().channel().closeFuture().sync();
            } catch (Exception e) {
                log.error("ServiceRegistration===发生异常, 信息:{}", e);
            } finally {
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}

2.1.3 RegistrationFuture

public class RegistrationFuture implements Future<Object> {

    private boolean isDone;

    private Object result;
    
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return isDone;
    }

    @Override
    public Object get() throws InterruptedException, ExecutionException {
        synchronized (this) {
            this.wait();
        }
        return result;
    }

    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }

    public void setResult(Object result) {
        this.result = result;
        this.isDone = Boolean.TRUE;
        synchronized (this) {
            this.notify();
        }
    }
}

2.1.4 MyProduceRegistrationHandler

@Slf4j
public class MyProduceRegistrationHandler extends SimpleChannelInboundHandler<String> {

    private RegistrationFuture registrationFuture;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("MyRegistrationHandler===读取到消息:{}",msg);
        registrationFuture.setResult("注册完成啦,大兄弟!");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        doRegistrationInfo(ctx);
        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("MyRegistrationHandler===发生异常:{}",cause);
        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    public MyProduceRegistrationHandler(RegistrationFuture registrationFuture) {
        this.registrationFuture = registrationFuture;
    }

    private void doRegistrationInfo(ChannelHandlerContext ctx) {
        RegistrationInfo registrationInfo = new RegistrationInfo();
        registrationInfo.setType(0);
        registrationInfo.setServiceName("myRPCProvider");
        registrationInfo.setIp("127.0.0.1");
        registrationInfo.setPort(7777);
        String jsonStr = JSONUtil.toJsonStr(registrationInfo);
        ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes(StandardCharsets.UTF_8)));
    }
}

3.0 服务提供

3.0.1 MyProvide

@Slf4j
public class MyProvide {

    public static void doProvide(){
        // 假设这里就是服务提供者, 标注了 需要提供的所有接口与其中实现类
        Map<String,Object> Beans = new HashMap<>();
        Beans.put("myRPCTest",new MyRPCTestImpl());
        
        new Thread(()->{
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap = serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                	.option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childHandler(new ProvideChannelInitializer(Beans));

            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 7777));
            channelFuture.addListener(el->{
                if (el.isSuccess()) {
                    log.info("MyProvide===服务提供者启动成功, 等待消费者访问!");
                }
            });
            try {
                channelFuture.sync().channel().closeFuture().sync();
            } catch (Exception e) {
                log.info("MyProvide===发生异常, 消息:{}",e);
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}

3.0.2 ProvideChannelInitializer

public class ProvideChannelInitializer extends ChannelInitializer<SocketChannel> {

    //假设这里就是服务提供者, 标注了 需要提供的所有接口与其中实现类
    private Map<String,Object> Beans;

    private static final UnorderedThreadPoolEventExecutor UNORDERED_THREAD_POOL_EVENT_EXECUTOR = new UnorderedThreadPoolEventExecutor(16);

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringEncoder());
        // 将ByteBuf转成为 json内容的ByteBuf
        pipeline.addLast(new JsonObjectDecoder());
        // 在转字符串
        pipeline.addLast(new StringDecoder());
        // 自定义业务处理器
        pipeline.addLast(UNORDERED_THREAD_POOL_EVENT_EXECUTOR,new ProvideChannelHandler(Beans));
    }

    public ProvideChannelInitializer(Map<String, Object> beans) {
        Beans = beans;
    }
}

3.0.3 ProvideChannelHandler

@Slf4j
public class ProvideChannelHandler extends SimpleChannelInboundHandler<String> {

    //假设这里就是服务提供者, 标注了 需要提供的所有接口与其中实现类
    private Map<String, Object> Beans;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        MessageProtocol messageProtocol = JSONUtil.toBean(msg, MessageProtocol.class);
        log.info("ProvideChannelHandler===, 开始处理客户端请求, 请求参数:{}",messageProtocol);
        if (Objects.nonNull(messageProtocol)) {
            String className = messageProtocol.getClassName();
            String methodName = messageProtocol.getMethodName();
            Object[] methodParameter = messageProtocol.getMethodParameter();
            Class[] classes = this.getClasses(messageProtocol);
            Object obj = Beans.get(className);
            if (Objects.nonNull(obj)) {
                try {
                    Class<?> aClass = obj.getClass();
                    Method method = aClass.getMethod(methodName,classes);
                    if (Objects.nonNull(method)) {
                        Object invoke = method.invoke(obj, methodParameter);
                        messageProtocol.setInvokeResult(new InvokeResult(0, invoke));
                        ctx.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(messageProtocol).getBytes(StandardCharsets.UTF_8)));
                        return;
                    }
                } catch (Exception e) {
                    messageProtocol.setInvokeResult(new InvokeResult(1, e.getMessage()));
                    ctx.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(messageProtocol).getBytes(StandardCharsets.UTF_8)));
                    return;
                }
            }
        }
        messageProtocol.setInvokeResult(new InvokeResult(1, "没有找到方法!"));
        ctx.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(messageProtocol).getBytes(StandardCharsets.UTF_8)));
    }

    private Class[] getClasses(MessageProtocol messageProtocol) {
        // 处理json字符串转实体, 无法转class数组
        return Optional.ofNullable(messageProtocol.getMethodParameterTypes())
                .map(el -> Arrays.stream(el).filter(Objects::nonNull)
                        .map(obj -> {
                            try {
                                return Class.forName(obj.toString().replace("class","").trim());
                            } catch (ClassNotFoundException e) {
                                return null;
                            }
                        }).toArray(Class[]::new)).orElse(null);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("MyChannelHandler===发生异常, 信息:{}", cause);
        ctx.close();
    }

    public ProvideChannelHandler(Map<String, Object> beans) {
        Beans = beans;
    }
}

测试服务提供者启动

进行服务注册, 服务提供

public class ProvideStart {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("假设SpringBoot项目启动中, 加载各类Bean");
        System.out.println("===================================");
        System.out.println("假设SpringBoot项目启动完成");
        System.out.println("===================================");
        System.out.println("进行服务注册");
        RegistrationFuture registrationFuture = new RegistrationFuture();
        ServiceRegistration.ProjectStartCompletedTrigger(registrationFuture);
        // 阻塞获取结果!
        System.out.println("=========="+registrationFuture.get()+"=============");
        System.out.println("进行服务提供");
        MyProvide.doProvide();
        System.out.println("============服务提供启动完成==============");
        LockSupport.park();
    }
}

提供者日志结果:

假设SpringBoot项目启动中, 加载各类Bean
===================================
假设SpringBoot项目启动完成
===================================
进行服务注册
19:01:54.638 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.ServiceRegistration - ServiceRegistration===已经成功连接!!!
19:01:54.705 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyProduceRegistrationHandler===doRegistrationInfo, 服务信息注册:{"ip":"127.0.0.1","type":0,"serviceName":"myRPCProvider","port":7777}
19:01:54.777 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyRegistrationHandler===读取到消息:{"result":"信息注册成功!"}
==========注册完成啦,大兄弟!=============
进行服务提供
============服务提供启动完成==============
19:01:54.795 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.rpc.serviceproduce.produce.MyProvide - MyProvide===服务提供者启动成功, 等待消费者访问!

注册中心日志

19:01:54.769 [defaultEventLoopGroup-4-1] INFO com.zhihao.netty.rpc.registrationcenter.handler.MyRegisteredChannelHandler - MyChannelHandler===注册成功一个服务提供者:RegistrationInfo(type=0, serviceName=myRPCProvider, ip=127.0.0.1, port=7777)

客户端消费者

1.1 定义接口

public interface MyRPCTest {

    String hiHi(String p1);
}

2.1 编写接口动态代理

2.1.1 ConsumerFactory

public class ConsumerFactory {

    /**
     * 获取代理对象
     *
     * @param aclass
     * @author: ZhiHao
     * @date: 2022/6/30
     */
    public static <T> T getProxy(Class<T> aclass) {
        if (null == aclass){
            return (T) aclass;
        }
        if (!aclass.isInterface()){
            throw new RuntimeException(aclass.getName()+ "该类不是接口, 无法使用JDK动态代理!");
        }
        return (T) Proxy.newProxyInstance(aclass.getClassLoader(), new Class[]{aclass}, new NettyInvocationHandler());
    }
}

2.1.2 NettyInvocationHandler

public class NettyInvocationHandler implements InvocationHandler {

    private Map<String, NettyClient> nettyClientMap;

    public NettyInvocationHandler() {
        this.nettyClientMap = NettyConsumerContext.nettyClientMap;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 假定这里获取接口注解数据,获取到服务名称
        String serviceName = "myRPCProvider";
        // 如果不包含说明没有初始化过对应地址的netty客户端, 进行初始化
        NettyClient nettyClient = null;
        if (!nettyClientMap.containsKey(serviceName)) {
            RegistrationInfo registrationInfo = GetRegistrationInfo.getGetRegistrationInfo(serviceName);
            nettyClient = NettyClient.buildClient(registrationInfo);
            Optional.ofNullable(nettyClient).ifPresent(el-> nettyClientMap.put(serviceName,el));
        }
        if (null == nettyClient){
            throw new RuntimeException(serviceName+"客户端初始化有误!");
        }
        Object invoke = nettyClient.doRemotelyInvoke(method, args);
        return invoke;
    }
}

3.1 拉取服务提供者信息

3.1.1 GetRegistrationInfo

@Slf4j
public class GetRegistrationInfo {

    private static MyGetRegistrationHandler myGetRegistrationHandler;

    public static final RegistrationInfo getGetRegistrationInfo(String serviceName) {
        checkingClientIsSuccess(serviceName);
        Object apply = myGetRegistrationHandler.apply(serviceName);
        if (apply instanceof String) {
            throw new InvalidParameterException(serviceName + "注册中心没有该服务地址");
        }
        if (apply instanceof RegistrationInfo) {
            return (RegistrationInfo) apply;
        }
        return null;
    }

    private synchronized static void checkingClientIsSuccess(String serviceName) {
        if (null == myGetRegistrationHandler) {
            try {
                initRegistrationNettyClient();
                GetRegistrationInfo.class.wait(5000L);
            } catch (Exception e) {
                log.error("GetRegistrationInfo===初始化客户端发生异常, 信息:{}", e);
                throw new RuntimeException(e);
            }
            if (null == myGetRegistrationHandler) {
                throw new RuntimeException("initNettyClient===初始化客户端发生异常");
            }
            log.error("GetRegistrationInfo===初始化客户端完成");
        }
    }

    private static void initRegistrationNettyClient() {
        new Thread(() -> {
            log.info("GetRegistrationInfo===开始初始化!!!");
            MyGetRegistrationHandler registrationHandler = new MyGetRegistrationHandler();
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
            Bootstrap bootstrap = new Bootstrap();
            bootstrap = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringEncoder());
                            // 将ByteBuf转成为 json内容的ByteBuf
                            pipeline.addLast(new JsonObjectDecoder());
                            // 在转字符串
                            pipeline.addLast(new StringDecoder());
                            // 自定义业务处理器
                            pipeline.addLast(registrationHandler);
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888));
            channelFuture.addListener(el -> {
                if (el.isSuccess()) {
                    myGetRegistrationHandler = registrationHandler;
                    log.info("GetRegistrationInfo===已经成功连接!!!");
                    synchronized (GetRegistrationInfo.class){
                        GetRegistrationInfo.class.notify();
                    }
                } else {
                    log.error("GetRegistrationInfo===连接失败!!!");
                }
            });
            try {
                channelFuture.sync().channel().closeFuture().sync();
            } catch (Exception e) {
                log.error("GetRegistrationInfo===发生异常, 信息:{}", e);
            } finally {
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}

3.1.2 MyGetRegistrationHandler

@Slf4j
public class MyGetRegistrationHandler extends SimpleChannelInboundHandler<String> implements Function<Object,Object> {
    
    private ChannelHandlerContext context;
    private Object result;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
        super.channelActive(ctx);
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("MyGetRegistrationHandler===读取到信息:{}",msg);
        JSONObject jsonObject = JSONUtil.parseObj(msg);
        this.result = jsonObject.get("result");
        if (this.result == null) {
            this.result = JSONUtil.toBean(msg, RegistrationInfo.class);
        }
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("MyGetRegistrationHandler===发生异常:{}",cause);
        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    // 同步方法, 禁止多个线程同时调用
    @Override
    public synchronized Object apply(Object o) {
        this.result = null;
        RegistrationInfo registrationInfo = new RegistrationInfo();
        registrationInfo.setServiceName(o.toString());
        registrationInfo.setType(1);
        try {
            this.context.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(registrationInfo).getBytes(StandardCharsets.UTF_8)));
            wait();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return this.result;
    }
}

4.0 初始化远程调用客户端

4.1.1 NettyClient

@Slf4j
public class NettyClient {

    private String ip;
    private Integer port;
    private String serviceName;

    private Map<String, ChannelHandlerContext> channelHandlerContextMap;

    private Map<String, SynchronousQueue<?>> resultConcurrentHashMap;

    public NettyClient(RegistrationInfo info) {
        if (StrUtil.isBlank(info.getIp())) {
            throw new RuntimeException("提供者注册信息有误!!!");
        }
        this.ip = info.getIp();
        this.port = info.getPort();
        this.serviceName = info.getServiceName();
        this.channelHandlerContextMap = NettyConsumerContext.channelHandlerContextMap;
        this.resultConcurrentHashMap = NettyConsumerContext.resultConcurrentHashMap;
    }

    public static NettyClient buildClient(RegistrationInfo info) {
        NettyClient nettyClient = new NettyClient(info);
        nettyClient.initNettyClient();
        boolean flag = nettyClient.channelHandlerContextMap.containsKey(nettyClient.serviceName);
        log.info("initNettyClient===serviceName:{}, 初始化客户端结果:{}", nettyClient.serviceName, flag);
        return flag ? nettyClient : null;
    }

    public Object doRemotelyInvoke(Method method, Object[] args) throws TimeoutException {
        ChannelHandlerContext channelHandlerContext = channelHandlerContextMap.get(serviceName);
        if (null == channelHandlerContextMap) {
            throw new RuntimeException("服务提供者已不存在!!!");
        }
        MessageProtocol protocol = new MessageProtocol();
        String requestId = UUID.fastUUID().toString() + this.getCurrentTimeSecond();
        String simpleName = method.getDeclaringClass().getSimpleName();
        protocol.setClassName(StrUtil.lowerFirst(simpleName));
        protocol.setMethodName(method.getName());
        protocol.setMethodParameterTypes(method.getParameterTypes());
        protocol.setMethodParameter(args);
        protocol.setRequestId(requestId);
        // 使用同步阻塞队列, 阻塞等待, 别的线程插入结果, 有结果线程在继续
        SynchronousQueue<?> synchronousQueue = new SynchronousQueue<>();
        // 以UUID+时间戳作为唯一key
        resultConcurrentHashMap.put(requestId, synchronousQueue);
        String jsonStr = JSONUtil.toJsonStr(protocol);
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes(StandardCharsets.UTF_8)));
        log.info("doRemotelyInvoke===已经发送, jsonStr:{}!!!!", jsonStr);
        Class<?> returnType = method.getReturnType();
        InvokeResult take = null;
        long second;
        try {
            second = this.getCurrentTimeSecond();
            take = (InvokeResult) synchronousQueue.poll(30L, TimeUnit.SECONDS);
            second = this.getCurrentTimeSecond() - second;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
		// 这里到时候可以配置成为通过取接口注解数据, 获取到超时时间poll(xxx, TimeUnit.SECONDS) 30L=xxx
        if (second >= 30L){
            resultConcurrentHashMap.remove(requestId);
            throw new TimeoutException("调用超时!!!");
        }

        Integer resultCode = Optional.ofNullable(take).map(InvokeResult::getResultCode).orElse(-1);
        Object result = null;
        if (resultCode.equals(1)) {
            throw new RuntimeException(take.getFailMessage());
        } else {
            String resultStr = Optional.ofNullable(take).map(InvokeResult::getInvokeResult)
                    .map(Objects::toString).orElse("");
            if (JSONUtil.isJsonArray(resultStr)) {
                result = JSONUtil.toList(resultStr, returnType);
            } else if (JSONUtil.isJsonObj(resultStr)) {
                result = JSONUtil.toBean(resultStr, returnType);
            } else if (ClassUtil.isBasicType(returnType)) {
                result = resultStr;
            } else {
                result = resultStr;
            }
        }
        return result;
    }

    private void initNettyClient() {
        // 以常量池中的服务名称作为锁对象
        synchronized (serviceName.intern()) {
            try {
                // 不包含则初始化
                if (!channelHandlerContextMap.containsKey(serviceName)) {
                    new Thread(() -> {
                        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
                        Bootstrap bootstrap = new Bootstrap();
                        bootstrap = bootstrap.group(workerGroup)
                                .channel(NioSocketChannel.class)
                                .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                                .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                                .handler(new MyChannelInitializer(serviceName));
                        ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(ip, port));
                        channelFuture.addListener(el -> {
                            if (el.isSuccess()) {
                                log.info("initNettyClient===serviceName:{}, 连接成功!!!", serviceName);
                            } else {
                                log.error("initNettyClient===serviceName:{}, 连接失败!!!", serviceName);
                            }
                        });
                        try {
                            channelFuture.sync().channel().closeFuture().sync();
                        } catch (Throwable e) {
                            log.error("initNettyClient===serviceName:{}, 发生异常, 信息:{}", serviceName, e);
                        } finally {
                            workerGroup.shutdownGracefully();
                        }
                    }).start();
                }

                log.info("initNettyClient===serviceName:{}, 开始等待初始化客户端完成", serviceName);
                // 以常量池中不可变引用服务字符串
                serviceName.intern().wait(5000L);
            } catch (Exception e) {
                log.error("initNettyClient===serviceName:{}, 初始化客户端发生异常, 信息:{}", serviceName, e);
            }
        }
    }

    private long getCurrentTimeSecond() {
        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
    }
}

4.1.2 MyChannelInitializer

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    private static final UnorderedThreadPoolEventExecutor UNORDERED_THREAD_POOL_EVENT_EXECUTOR = new UnorderedThreadPoolEventExecutor(16);

    private String serviceName;
    public MyChannelInitializer(String serviceName) {
        this.serviceName = serviceName;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringEncoder());
        // 将ByteBuf转成为 json内容的ByteBuf
        pipeline.addLast(new JsonObjectDecoder());
        // 在转字符串
        pipeline.addLast(new StringDecoder());
        // 自定义业务处理器
        pipeline.addLast(UNORDERED_THREAD_POOL_EVENT_EXECUTOR,new MyConsumerHandler(this.serviceName));
    }
}

4.1.3 MyConsumerHandler

@Slf4j
public class MyConsumerHandler extends SimpleChannelInboundHandler<String> {

    private String serviceName;
    private Map<String, SynchronousQueue<?>> resultConcurrentHashMap;

    public MyConsumerHandler(String serviceName) {
        this.serviceName = serviceName;
        this.resultConcurrentHashMap = NettyConsumerContext.resultConcurrentHashMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("MyConsumerHandler===serviceName:{}, 服务端数据响应:{}", msg);
        MessageProtocol messageProtocol = JSONUtil.toBean(msg, MessageProtocol.class);
        if (null == messageProtocol) {
            log.error("MyConsumerHandler===serviceName:{}, 发生异常:{}, 服务端发生空数据响应", serviceName);
            return;
        }
        String requestId = messageProtocol.getRequestId();
        SynchronousQueue<Object> synchronousQueue = (SynchronousQueue<Object>) = resultConcurrentHashMap.get(requestId);
        if (synchronousQueue == null){
            log.error("MyConsumerHandler===serviceName:{}, 发生异常:{}, 服务端数据响应, 已超时!!!", serviceName);
            return;
        }
        // 取完删除
        resultConcurrentHashMap.remove(requestId);
        synchronousQueue.put(messageProtocol.getInvokeResult());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyConsumerContext.channelHandlerContextMap.put(serviceName, ctx);
        // 添加完成后, 以常量池中不可变服务字符串, 唤醒等待初始化客户端线程
        synchronized (serviceName.intern()) {
            serviceName.intern().notify();
        }
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyConsumerContext.channelHandlerContextMap.remove(ctx);
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        NettyConsumerContext.channelHandlerContextMap.remove(ctx);
        log.error("MyConsumerHandler===serviceName:{}, 发生异常:{}", serviceName, cause);
        super.exceptionCaught(ctx, cause);
    }
}

4.1.4 NettyConsumerContext

@Slf4j
public class NettyConsumerContext {

    public static final Map<String, ChannelHandlerContext> channelHandlerContextMap = new ConcurrentHashMap<>();

    public static final Map<String, SynchronousQueue<?>> resultConcurrentHashMap = new ConcurrentHashMap<>();

    public static final Map<String, NettyClient> nettyClientMap = new ConcurrentHashMap<>();
}

进行消费者测试

public class ConsumerStart {

    public static void main(String[] args) {
        System.out.println("假设SpringBoot项目启动中, 加载各类Bean");
        System.out.println("===================================");
        System.out.println("假设SpringBoot项目启动完成");
        System.out.println("===================================");
        // 假设这里创建好的代理对象, 放进入IOC容器中
        MyRPCTest proxy = ConsumerFactory.getProxy(MyRPCTest.class);
        System.out.println("假设注入MyRPCTest bean 完成");
        System.out.println("进行服务调用");
        System.out.println(proxy.hiHi("测试一波开始开始开始"));
    }
}

服务提供者日志:

14:43:04.213 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyProduceRegistrationHandler===doRegistrationInfo, 服务信息注册:{"ip":"127.0.0.1","type":0,"serviceName":"myRPCProvider","port":7777}
14:43:04.307 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyRegistrationHandler===读取到消息:{"result":"信息注册成功!"}
==========注册完成啦,大兄弟!=============
进行服务提供
============服务提供启动完成==============
14:43:04.335 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.rpc.serviceproduce.produce.MyProvide - MyProvide===服务提供者启动成功, 等待消费者访问!
14:43:11.405 [unorderedThreadPoolEventExecutor-5-2] INFO com.zhihao.netty.rpc.serviceproduce.produce.handler.ProvideChannelHandler - ProvideChannelHandler===, 开始处理客户端请求, 请求参数:MessageProtocol(className=myRPCTest, methodName=hiHi, methodParameterTypes=[class java.lang.String], methodParameter=[测试一波开始开始开始], requestId=ad39b4be-11aa-481b-bf56-50a8e19df4801656744191, invokeResult=null)

消费者客户端日志:

假设SpringBoot项目启动中, 加载各类Bean
===================================
假设SpringBoot项目启动完成
===================================
假设注入MyRPCTest bean 完成
进行服务调用
14:43:10.565 [Thread-0] INFO com.zhihao.netty.rpc.clientconsumer.registrationinfo.GetRegistrationInfo - GetRegistrationInfo===开始初始化!!!
14:43:11.328 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.clientconsumer.registrationinfo.handler.MyGetRegistrationHandler - MyGetRegistrationHandler===读取到信息:{"ip":"127.0.0.1","type":0,"serviceName":"myRPCProvider","port":7777}
14:43:11.354 [main] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - initNettyClient===serviceName:myRPCProvider, 开始等待初始化客户端完成
14:43:11.366 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - initNettyClient===serviceName:myRPCProvider, 连接成功!!!
14:43:11.366 [main] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - initNettyClient===serviceName:myRPCProvider, 初始化客户端结果:true
14:43:11.375 [main] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - doRemotelyInvoke===已经发送, jsonStr:{"methodName":"hiHi","methodParameter":["测试一波开始开始开始"],"className":"myRPCTest","requestId":"ad39b4be-11aa-481b-bf56-50a8e19df4801656744191","methodParameterTypes":["class java.lang.String"]}!!!!
14:43:11.425 [unorderedThreadPoolEventExecutor-4-2] INFO com.zhihao.netty.rpc.clientconsumer.consumer.handler.MyConsumerHandler - MyConsumerHandler===serviceName:{"methodName":"hiHi","methodParameter":["测试一波开始开始开始"],"className":"myRPCTest","requestId":"ad39b4be-11aa-481b-bf56-50a8e19df4801656744191","invokeResult":{"resultCode":0,"invokeResult":"成功进行了远程调用哟, 恭喜恭喜! 你的参数:测试一波开始开始开始===67e60a8d-14f0-43ea-8130-fef9ee48478e"},"methodParameterTypes":["class java.lang.String"]}, 服务端数据响应:{}
成功进行了远程调用哟, 恭喜恭喜! 你的参数:测试一波开始开始开始===67e60a8d-14f0-43ea-8130-fef9ee48478e

到此一个简单的RPC框架就完成了!!!

总结:

结论:

以上提到的只是 RPC 的基础流程,这对于工业级别的使用是远远不够的。
生产环境中的服务提供者都是集群部署的,所以有多个提供者,而且还会随着大促等流量情况动态增减机器。
调用者也能通过注册中心得知服务提供者下线。
还需要有路由分组策略,调用者根据下发的路由信息选择对应的服务提供者,能实现分组调用、灰度发布、流量隔离等功能。
还需要有负载均衡策略,一般经过路由过滤之后还是有多个服务提供者可以选择,通过负载均衡策略来达到流量均衡。
当然还需要有异常重试,毕竟网络是不稳定的,而且有时候某个服务提供者也可能出点问题,所以一次调用出错进行重试,较少业务的损耗。
还需要限流熔断,限流是因为服务提供者不知道会接入多少调用者,也不清楚每个调用者的调用量,所以需要衡量一下自身服务的承受值来进行限流,防止服务崩溃。
而熔断是为了防止下游服务故障导致自身服务调用超时阻塞堆积而崩溃,特别是调用链很长的那种,影响很大。

netty提供的获取异步结果的

客户端中的SynchronousQueue 可以使用netty中的 DefaultPromise 代替, 来获取异步线程执行结果!!

说明: DefaultPromise

用法:

// 客户端使用 resultConcurrentHashMap = 并发map
DefaultPromise<Object> defaultPromise = new DefaultPromise<>(channelHandlerContext.executor());
resultConcurrentHashMap.put(requestId, defaultPromise);
take = (InvokeResult) defaultPromise.get(30L, TimeUnit.SECONDS); // 这里超时了会自动抛出超时异常
// 处理器中使用
DefaultPromise<Object> synchronousQueue = resultConcurrentHashMap.remove(requestId);
 synchronousQueue.setSuccess(messageProtocol.getInvokeResult());
// synchronousQueue.setFailure(new RuntimeException("3333333333333333")); 设置失败抛异常

1

标签:02,Netty,20,String,ctx,class,serviceName,new,public
来源: https://www.cnblogs.com/zhihao-plus/p/16437733.html