其他分享
首页 > 其他分享> > 从零开始学Netty (四)-- 实战实现前后端分离的IM框架

从零开始学Netty (四)-- 实战实现前后端分离的IM框架

作者:互联网

网上有很多Netty的教程DEMO,但是前后端分离的例子可能比较少。这次我将会使用SpringBoot + Netty + WebSocket的技术栈,实现前后端分离的即时通讯(IM)的DEMO。

主要功能

服务端接收的消息主要分以下几类:

1、CONNECT:当websocket 第一次open的时候,初始化channel,把用的channel 和 userid 关联起来。

2、CHAT:聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]。如果接收者在线,则转发给相应的接收者。

3、SIGNED:签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收]。

4、KEEPALIVE:心跳类型的消息,如果心跳超时未连接,则服务端断开连接。

消息实体类

public class DataContent implements Serializable {

    private Integer action;

    private String userId;

    private String receiverId;

    private String msg;

    ...

}

定义消息类型action,发送用户userId,接收用户receiverId,消息实体msg。

action类型定义:

    public static final int CONNECT = 1;

    public static final int CHAT = 2;

    public static final int SIGNED = 3;

    public static final int KEEPALIVE = 4;

connect:请求类型;

chat:聊天发送;

signed:聊天接收;

keepalive:保持心跳连接;

ChatHandler处理核心业务

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    //用于记录和管理所有客户端的channel
    public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //获取客户端所传输的消息
        String content = msg.text();
        System.out.println(content);
        //1.获取客户端发来的消息
        DataContent dataContent = JSON.parseObject(content, DataContent.class);
        Integer action = dataContent.getAction();
        Channel channel =  ctx.channel();
        //2.判断消息类型,根据不同的类型来处理不同的业务
        if(action == MessageActionConstant.CONNECT){
            //2.1 当websocket 第一次open的时候,初始化channel,把用的channel 和 userid 关联起来
            UserChanelRel.put(dataContent.getUserId(), channel);
        } else if(action == MessageActionConstant.CHAT){
            //2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]
            // 发送消息
            Channel receiverChannel = UserChanelRel.get(dataContent.getReceiverId());
            if(receiverChannel ==null){
                //离线用户
            }else{
                //当receiverChannel 不为空的时候,从ChannelGroup 去查找对应的channel 是否存在
                Channel findChanel = users.find(receiverChannel.id());
                if(findChanel!=null){
                    // 用户在线
                    receiverChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dataContent)));
                }else{
                    //离线用户
                }
            }
        } else if(action == MessageActionConstant.SIGNED){
//            //2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收]
        } else if(action == MessageActionConstant.KEEPALIVE){
            //2.4 心跳类型的消息
            System.out.println("收到来自channel 为【"+channel+"】的心跳包");
        }

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        users.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        String chanelId = ctx.channel().id().asShortText();
        System.out.println("客户端被移除:channel id 为:"+chanelId);
        users.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //发生了异常后关闭连接,同时从channelgroup移除
        ctx.channel().close();
        users.remove(ctx.channel());
    }
}

这里根据不同的action,进行业务处理。聊天信息的持久化没做,有需要的可以自行补足。

初始化ChannelPipeline的处理器

public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(1024*64));
        pipeline.addLast(new IdleStateHandler(8,10,12));

        // Check Heart Beat
        pipeline.addLast(new HeartBeatHandler());
        // Web Socket Server
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // Chat Service Handler
        pipeline.addLast(new ChatHandler());
    }
}

HttpServerCodec:websocket 基于http协议,所需要的http编解码器。
ChunkedWriteHandler:对下数据流写提供支持。
HttpObjectAggregator:对httpMessage进行聚合处理,聚合成request或response。
IdleStateHandler:针对客户端,如果在1分钟时间内没有向服务端发送读写心跳(ALL),则主动断开连接。如果有读空闲和写空闲,则不做任何处理。
HeartBeatHandler:自定义的空闲状态检测的handler。
WebSocketServerProtocolHandler:处理与WebSocket协议相关的动作,包括握手、心跳等。
ChatHandler:处理核心业务。

启动Netty服务器

@Component
public class WebSocketServer {

    @Value("${chat.server.host}")
    private String host;

    @Value("${chat.server.port}")
    private int port;

    private static class SingletionWSServer {
        static final WebSocketServer instance = new WebSocketServer();
    }

    @Bean
    public static WebSocketServer getInstance() {
        return SingletionWSServer.instance;
    }

    private EventLoopGroup mainGroup;
    private EventLoopGroup subGroup;
    private ServerBootstrap server;
    private ChannelFuture future;

    public WebSocketServer() {
        mainGroup = new NioEventLoopGroup();
        subGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WSServerInitialzer());
    }

    public void start() {
        this.future = server.bind(host,port);
        if (future.isSuccess()) {
            System.out.println("Web Socket Server Started!");
        }
    }
}

与SpringBoot集成启动

@Component
public class NettyServerBooter implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        WebSocketServer.getInstance().start();
    }
}

 

github源码地址:https://github.com/lwtxzwt/easy-chat

欢迎点赞!

标签:Netty,--,ctx,签收,IM,private,new,public,channel
来源: https://blog.csdn.net/zwt122755527/article/details/110049431