其他分享
首页 > 其他分享> > 仿淘宝开放平台之消息服务——客户端处理链条设计与实现

仿淘宝开放平台之消息服务——客户端处理链条设计与实现

作者:互联网

使用netty框架实现websocket客户端。

启动框架

相比服务端标准模式而言,客户端启动类有比较多需要注意的地方,关键的逻辑有两个:

一是客户端需要处理自动重连,这里实际是两种情况,一种是客户端刚启动的时候,尝试去连接服务端,如不成功,则休眠5秒后再次重试;另外一种是出现异常时,包括原先建立连接、正常通信的情况下因为各种原因导致通道失效、心跳异常、服务端退出等,都会自动尝试重连,这样可以确保出现问题时无需系统管理员手工干预,自动重连来恢复运行。
二是连接成功后,要发起WebSocket的握手操作,将http协议升级为websocket协议,关键在于自实现的WebSocketClientHandshakerHandler处理器。

@Slf4j
@Component
public class MessageClient {


    @Autowired
    private MessageClientGlobalHolder config;

    @Autowired
    private MessageClientChannelInitializer messageClientChannelInitializer;



    /**
     * 启动客户端方法
     */
    public void start() {

        EventLoopGroup workerGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(messageClientChannelInitializer);


            //客户端与服务端连接的通道,final修饰表示只会有一个
            ChannelFuture channelFuture = bootstrap.connect(config.getHost(), config.getPort());
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        //未成功
                        log.error("连接失败", future.cause());
                        //执行重连
                        reconnect();
                    } else {
                        log.info("连接成功");
                        Channel channel = future.channel();
                        //将channel保存到全局变量
                        config.setChannel(channel);
                        //发起握手
                        WebSocketClientHandshakerHandler handler = (WebSocketClientHandshakerHandler) channel.pipeline().get("hookedHandler");
                        handler.handshake(config.getChannel());
                    }
                }
            });
            //等待服务器端关闭
            channelFuture.channel().closeFuture().sync();


        } catch (Exception e) {
            log.error("消息客户端启动失败:{}" + e.getMessage(), e);
            //执行重连
            reconnect();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 重连
     */
    public void reconnect() {
        try {
            Thread.sleep(5000);
            //执行重连
            log.info("消息客户端进行重连");
            start();

        } catch (InterruptedException e) {
            log.error("消息客户端重连过程中线程休眠失败", e);
        }

    }

   
}

处理器配置

消息处理器的装配与实现关键实现


/**
 * 初始化通道
 *
 * @author wqliu
 * @date 2021-2-5 15:12
 */
@Slf4j
@Component
public class MessageClientChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Autowired
    private MessageClientGlobalHolder config;

    @Autowired
    private Environment environment;

    /**
     * 生产运行模式
     */
    private final String PRD_MODE="prd";

    /**
     * 初始化channel
     */
    @Override
    public void initChannel(SocketChannel socketChannel) throws Exception {



        //获取通道链路
        ChannelPipeline pipeline = socketChannel.pipeline();

        //仅在生产模式下加载ssl过滤器
        String mode=environment.getProperty("spring.profiles.active");
        if(PRD_MODE.equals(mode)){
            //ssl
            SSLContext sslContext = createSslContext();
            SSLEngine engine = sslContext.createSSLEngine();
            engine.setNeedClientAuth(false);
            engine.setUseClientMode(false);
            pipeline.addLast(new SslHandler(engine));
        }


        //HTTP 编解码
        pipeline.addLast(new HttpClientCodec());

        // 聚合为单个 FullHttpRequest 或者 FullHttpResponse
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));

        /**
         * 注意,因WebSocketClientHandshakerHandler继承自SimpleChannelInboundHandler,会自动释放消息
         * 对于收到服务端的pong消息,默认情况下不会往通道后续的处理器传递,所以若放到WebSocketClientHandshakerHandler之后,
         * 则会产生读空闲,导致心跳超时失效。
         */
        // 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
        pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
                0, TimeUnit.SECONDS));

        //心跳超时处理
        pipeline.addLast(new HeartbeatTimeoutHandler());


        //处理web socket协议与握手
        pipeline.addLast("hookedHandler", new WebSocketClientHandshakerHandler());


        //心跳发送
        pipeline.addLast(new HeartbeatRequestHandler(config.getHeartbeatRate()));

        //将文本按消息类型转换为请求消息或响应消息
        pipeline.addLast(new MessageTypeDecodeHandler());

        //请求消息业务逻辑处理器
        pipeline.addLast(new RequestMessageBusinessHandler());

        //响应消息业务逻辑处理器
        pipeline.addLast(new ResponseMessageBusinessHandler());

        //编码为TextWebSocketFrame
        pipeline.addLast(new TextWebSocketFrameEncodeHandler());

        //json序列化
        pipeline.addLast(new JsonEncodeHandler());


    }

    /**
     * 创建ssl上下文对象
     * @param type
     * @param path
     * @param password
     * @return
     * @throws Exception
     */
    public SSLContext createSslContext() throws Exception {

        //读取配置信息
        String path=environment.getProperty("server.ssl.key-store");
        log.info("证书地址:{}",path);
        String password=environment.getProperty("server.ssl.key-store-password");
        String type=environment.getProperty("server.ssl.key-store-type");

        //构建证书上下文对象
        KeyStore ks = KeyStore.getInstance(type);
        path=path.replace("classpath:","");
        log.info("处理后的证书地址:{}",path);
        ClassPathResource resource = new ClassPathResource(path);
        InputStream ksInputStream = resource.getInputStream();
        ks.load(ksInputStream, password.toCharArray());
        //KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());
        //SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(kmf.getKeyManagers(), null, null);
        return sslContext;
    }


}

处理器清单

一共涉及到12个消息处理器,其中4个是netty内置的,只是进行了参数配置,其他8个是自己实现的,用于处理逻辑和数据的,依次如下:

序号处理器类型职责实现说明
1SslHandler处理可靠安全连接内置仅在生产环境,需要进行ssl加解密
2HttpClientCodecHTTP 编解码内置对Http请求进行解码与编码
3HttpObjectAggregator聚合HTTP 请求或响应内置将http请求或响应聚合为一个完整对象
4IdleStateHandler空闲监测内置监测空闲状态,触发后续超时处理
5HeartbeatTimeoutHandler心跳超时处理自定义心跳超时执行关闭连接,触发重连
6WebSocketClientHandshakerHandlerWebSocket专用处理自定义处理WebSocket的握手以及Ping、Pong、Close消息
7HeartbeatRequestHandler发送心跳请求自定义客户端向服务端定时发送心跳
8MessageTypeDecodeHandler文本反序列化成消息对象自定义将文本按消息类型转换为请求消息或响应消息
9RequestMessageBusinessHandler处理请求消息自定义请求消息业务逻辑处理器
10ResponseMessageBusinessHandler处理响应消息自定义响应消息业务逻辑处理器
11TextWebSocketFrameEncodeHandlerJSON格式转文本帧自定义将json格式字符串编码为TextWebSocketFrame
12JsonEncodeHandler对象序列化为JSON字符串自定义将对象序列化为json格式字符串

自定义的8个处理器中,最后两个11和12是出站处理器,注意实际执行顺序是先12后11,也就是,业务逻辑处理器9或10的处理结果是一个对象,先由出站处理器12将其序列化为json字符串,然后再由出站处理器11将其包装为一个WebSocket协议约定的文本帧TextWebSocketFrame。

自定义处理器

心跳超时处理

这个处理器实际是配合netty内置的空闲检测处理器IdleStateHandler使用的,只有满足IdleStateHandler中设置的触发条件,才会触发本处理器中的userEventTriggered方法,执行自定义的逻辑操作,这里是主动关闭连接。

客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。


/**
 * 心跳超时处理器
 * @author wqliu
 * @date 2021-10-2 14:25
 **/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                log.info("读空闲");
                //关闭连接
                ctx.channel().close();
            }
        } else {
            //非空闲事件,传递到下个处理器
            super.userEventTriggered(ctx, evt);
        }

    }


}

WebSocket专用处理

这是很关键的一个处理器,自身也比较复杂。

主要实现是借助netty提供的一个WebSocketClientHandshaker类,在初始化时设置websocket服务端连接信息,然后在客户端启动时,调用该类的发起握手方法handshake,服务器端收到该握手请求后,会进行后续处理,响应一个协议升级,将http协议升级为WebSocket协议。

同时需要注意的是,这里还有一个我们自定义的操作,即在握手成功,协议升级后,客户端发出一个登录服务端的请求消息。

/**
 * 处理web socket握手
 *
 * @author wqliu
 * @date 2021-9-28 16:33
 **/
@Slf4j
@Data
public class WebSocketClientHandshakerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 握手
     */
    private WebSocketClientHandshaker handshaker;
    /**
     * 握手 异步处理
     */
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandshakerHandler() {
        //初始化握手处理者
        MessageClientGlobalHolder config = SpringUtil.getBean(MessageClientGlobalHolder.class);
        URI webSocketUri = null;
        try {
            webSocketUri = new URI(config.getWebSocketUrl());
        } catch (URISyntaxException e) {
            log.error("解析远程服务器地址出错", e);
        }
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketUri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders());
        this.setHandshaker(handshaker);


    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // log.info("收到消息:{}",msg.toString());
        Channel ch = ctx.channel();
        FullHttpResponse response;
        //进行握手操作
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse) msg;
                //握手协议返回,设置结束握手
                this.handshaker.finishHandshake(ch, response);
                //设置成功
                this.handshakeFuture.setSuccess();

            } catch (WebSocketHandshakeException var7) {
                 //已握手成功并将http协议升级为了WebSocket协议,不应再收到Http消息,发生这种情况则抛出异常
                FullHttpResponse res = (FullHttpResponse) msg;
                String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            response = (FullHttpResponse) msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else if (msg instanceof CloseWebSocketFrame) {
            log.info("收到关闭信息");

        } else if (msg instanceof TextWebSocketFrame) {
            log.info("收到文本帧,往下传递");
            ReferenceCountUtil.retain(msg);
            ctx.fireChannelRead(msg);
        }

    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {

        //设置握手成功后,发起登录请求
        this.handshakeFuture = ctx.newPromise();
        ChannelFuture handshakeFuture = this.handshakeFuture;
        handshakeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //发送登录请求
                    log.info("握手成功");
                    Login login = new Login();
                    login.sendMessage();

                } else {
                    //握手失败
                    log.error("握手失败", future.cause());
                }
            }
        });


    }

    /**
     * 发起握手
     */
    public void handshake(Channel channel) {
        this.getHandshaker().handshake(channel);
    }


}

发送心跳请求

心跳机制是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame

/**
 * 心跳请求处理器
 * @author wqliu
 * @date 2021-10-2 13:24
 **/
@Slf4j
public class HeartbeatRequestHandler extends ChannelInboundHandlerAdapter {

    /**
     * 心跳发送间隔,单位秒
     */
    private int heartbeatInterval=5;

    public HeartbeatRequestHandler(int heartbeatInterval){
        this.heartbeatInterval=heartbeatInterval;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.scheduleWithFixedDelay(new Runnable() {
            private Channel channel;
            @Override
            public void run() {
                // log.info("发送心跳");
                PingWebSocketFrame frame=new PingWebSocketFrame();
                ChannelFuture channelFuture = channel.writeAndFlush(frame);
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        // log.error(future.isSuccess()+"",future.cause());
                    }
                });
            }

            public Runnable setChannel(Channel channel){
                this.channel=channel;
                return this;
            }

        }.setChannel(ctx.channel()),15,heartbeatInterval, TimeUnit.SECONDS);

        //不调用父类方法,则其他处理器的channelActive事件不再触发
        super.channelActive(ctx);
    }
}

文本反序列化成消息对象

我将消息设计为两类,请求消息和响应消息,这里通过自己实现的一个处理器,将客户端传来的文本帧,通过消息类型属性反序列化成请求消息对象或响应消息对象,这里调用的是公用的处理器,即服务端也使用相同的处理器。


/**
 * 消息类型解码
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
public class MessageTypeDecodeHandler extends MessageToMessageDecoder<TextWebSocketFrame> {
    @Override
    protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
        String message=msg.text();
        //消息解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String messageType = jsonObject.getString("messageType");
        if (messageType.equals(MessageType.REQUEST.name())) {
            RequestMessage requestMessage = JSON.parseObject(message, RequestMessage.class);
            out.add(requestMessage);

        }else if (messageType.equals(MessageType.RESPONSE.name())) {

            ResponseMessage responseMessage = JSON.parseObject(message, ResponseMessage.class);
            out.add(responseMessage);
        }
    }

}

请求/响应消息处理器

上一步把消息内容通过解码形成了请求消息或响应消息,而这两个处理器只需加入到链条中即可,根据传入的消息类型,也就是泛型参数类型,会自动识别处理或者往下传递。



JSON格式转文本帧

这个其实也没什么好说的,就是把JSON格式字符串放到文本帧中,这里调用的是公用的处理器,即服务端也使用相同的处理器。

/**
 * 将json格式字符串编码为TextWebSocketFrame
 * @author wqliu
 * @date 2021-10-6 11:23
 **/

public class TextWebSocketFrameEncodeHandler extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
        TextWebSocketFrame frame=new TextWebSocketFrame(msg);
        out.add(frame);

    }


}

对象序列化为JSON字符串

这个其实也没什么好说的,就是把对象转换为JSON格式字符串,这里调用的是公用的处理器,即服务端也使用相同的处理器。

/**
 * 将对象序列化为json格式字符串
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
public class JsonEncodeHandler extends MessageToMessageEncoder<BaseMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, BaseMessage msg, List<Object> out) throws Exception {

        if(msg instanceof BaseMessage) {
            out.add(JSONObject.toJSONString(msg));
        }else{
            out.add(msg);
        }
    }


}

标签:开放平台,消息,处理器,new,msg,链条,public,channel,客户端
来源: https://blog.csdn.net/seawaving/article/details/122498327