其他分享
首页 > 其他分享> > netty 学习笔记三:大跃进,使用 netty 实现 IM 即时通讯系统

netty 学习笔记三:大跃进,使用 netty 实现 IM 即时通讯系统

作者:互联网

本节在《netty 学习笔记二》之上进行了一段大跃进,因此在本节你将会一股脑看到 netty自定义协议设计、数据载体 ByteBuf API、通信协议编解码、pipeline 结构、ChannelHandler 生命周期 和 热插拔效果、单聊和群聊实现、心跳与空闲检测、netty IM 系统的性能优化等等内容。由于 netty 已经造了很多好用的轮子,如粘包拆包处理器、空闲检测处理器、通用编解码器等,我们只需要配置一些构造参数,基本上就可以足够使用,不用再重复造轮子了。

代码已经上传到我的 github:https://github.com/christmad/code-share/tree/master/netty-group-chat

 

群聊最终效果——>>>

服务端:

客户端-老王:

客户端-隔壁老王:

客户端-盘古:

 

 自定义协议设计:

 

一种通用的通信协议设计:
  1. 魔数(magic number)
    魔数作为第一个字段,通常情况下为固定的几个字节,我们可以规定为 4个字节。值一般设定为不容易被猜到的。
    魔数可以认为是一种显示的起始标志,在 java 的二进制文件中以魔数 0xcafebabe 作为开头,有异曲同工之妙。
    在源源不断的网络包中,起始标志可以减少错误率,迅速找出正确的包。
    在编程中,magic number 也用来描述不使用变量名而直接使用数字的编程习惯,直接使用数字通常会引起歧义。
  2. 版本号
    通常是预留字段, IP 协议中也有一个 version 字段用来标识 IP 协议的版本是 IPv4 或 IPv6。
  3. 序列化算法
    序列化算法,是指如何把对象转为二进制数据,以及把二进制数据转为对象,此处是 java 对象。
    比如 java 自带的序列化算法,json,hessian 等序列化方式。
    规定一个字节,可以表示 256 种算法,足够用了。
  4. 指令
    比如 IM即时通信系统中客户端登录、聊天等指令。
    对于 IM系统 ,可以规定 1个字节,可以表示 256 种指令,完全够用。
  5. 数据长度
    规定4个字节。
  6. 数据内容
    变长 N 字节,具体内容序列化后可以占不同的长度。

目前除了版本号外,这里设计的每一个字段在 ChannelHandler 里都提现出来了。魔数对应 IMProtocolSplitter(同时完成了服务端拆包工作);序列化算法对应 UltimatePacketCodecHandler;指令对应 IMHandler;数据对应了具体类型的 Packet。序列化算法采用了 alibaba 的 FASTJSON,JSON 也是前端大量在使用的一种序列化方式。

有了自定义协议设定,编码时只要逐字段按照协议拼装字节即可,通常我们的 java 对象使用 FASTJSON 序列化后会塞到“数据”字段里。解码时比较关键的两个字段是“指令”和“数据”,根据“指令”类型获取对应的 Class 类型,然后获取序列化过的 byte[],最后 FASTJSON API 使用这两个参数进行反序列化。 

 

数据载体 ByteBuf API:

Bytebuf 数据结构如下图:

API 这块最终每个人都有自己不同的熟悉程度,不好谈。只讲两点:

1. get/set 方法不会改变 读写指针,而 read/write 方法会改变读写指针。

2. 如果遇到内存紧张的问题,一定是没有释放内存。netty 某些 decoder 会自动释放内存,但如果假设我这个项目中用的 MessageToMessageCodec 底层没有帮我们管理内存而导致内存泄漏,我们就应该自己在程序中手动释放内存。对应的方法是 ByteBuf#release(),它将会把 ByteBuf 引用计数减 1,减到 0 时表示能被回收。默认申请完一块 ByteBuf 默认计数为 1。对应的增加计数的方法为 retain(),在 slice()、duplicate() 场景下会用到。

 

ConsoleCommand 与 UI:

由于本次实现的 netty IM系统 server端 和 client端都由 java 实现,而我们的 client端使用了控制台来实现。因此代码中的每一种 ConsoleCommand 就对应了实际项目中的 UI 控件按钮,如 createGroup、listGroupMembers 等“一级指令”就对应一个个 UI 上的按钮。构造了一个 ConsoleCommandManager 方便聚合所有的“二级指令”,这个功用和 IMHandler 有点相似,都可以简化多层 if else 代码。

 

心跳与空闲检测:

网络应用程序普遍会遇到的一个问题:连接假死

连接假死的现象是:在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开。

连接假死会带来以下两大问题

1. 对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序崩溃。
2. 对于客户端来说,连接假死会造成发送数据超时,影响用户体验。

通常,连接假死由以下几个原因造成的:

a. 应用程序出现线程堵塞,无法进行数据的读写
b. 客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障
c. 公网丢包。公网环境相对内网而言,非常容易出现丢包,网络抖动等现象,如果在一段时间内用户接入的网络连续出现丢包现象,那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着

连接假死的应对策略就是空闲检测

空闲检测指的是每隔一段时间,检测这段时间内是否有数据读写,简化一下,我们的服务端只需要检测一段时间内,是否收到过客户端发来的数据即可,Netty 自带的 IdleStateHandler 就可以实现这个功能
PS:这个问题上服务端和客户端的策略是一样的

服务端在一段时间内没有收到客户端的数据,这个现象产生的原因可以分为以下两种:(从客户端角度看也是类似的)

1. 连接假死。
2. 非假死状态下确实没有发送数据
只需要排查第二种情况。使用 Netty 自带的 IdleStateHandler 就可以实现这个功能,见代码 IMIdleStateHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/IMIdleStateHandler.java

Netty IM 即时通讯系统优化:

优化通常指在服务端优化,服务端单机可能会面对十几万甚至几十万连接,需要进行一些对象碎片管理、优化(缩短)调用链(netty 中叫做 缩短事件传播路径)、阻塞方法优化等。

1. 共享 handler

在 ServerBootstrap 的 childHandler() 方法中,ChannelInitializer 类的 initChannel 逻辑是:每次有新连接到来的时候,都会调用 ChannelInitializer 的 initChannel() 方法,然后把我们添加的 ChannelHandler 都 new 一次,插入到 channel pipeline 中。

仔细观察这些 handler ,它们方法中是没有成员变量的,也就是无状态的,因此可以用单例模式来优化这些实例。在单机十几万甚至几十万连接的情况下,单例使得性能得到一定程度提升,创建的小对象也大大减少了。

然后重要的一点是,在 netty 中声明一个 ChannelHandler 是共享的,需要使用注解 @ChannelHandler.Sharable 来告诉 netty 这个 handler 是可以被多个 channel 共享的。

在没有单例优化前,你的 ChannelInitializer # initChannel() 方法可能是这样的:

 1 serverBootstrap
 2                 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 3                     protected void initChannel(NioSocketChannel ch) {
 4                         ch.pipeline().addLast(new Spliter());
 5                         ch.pipeline().addLast(new PacketDecoder());
 6                         ch.pipeline().addLast(new LoginRequestHandler());
 7                         ch.pipeline().addLast(new AuthHandler());
 8                         ch.pipeline().addLast(new MessageRequestHandler());
 9                         ch.pipeline().addLast(new CreateGroupRequestHandler());
10                         ch.pipeline().addLast(new JoinGroupRequestHandler());
11                         ch.pipeline().addLast(new QuitGroupRequestHandler());
12                         ch.pipeline().addLast(new ListGroupMembersRequestHandler());
13                         ch.pipeline().addLast(new GroupMessageRequestHandler());
14                         ch.pipeline().addLast(new LogoutRequestHandler());
15                         ch.pipeline().addLast(new PacketEncoder());
16                     }
17                 });
initChannel

使用单例改造后,ChannelInitializer # initChannel() 方法是这样的:

serverBootstrap
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new Spliter());
                ch.pipeline().addLast(new PacketDecoder());
                ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
                ch.pipeline().addLast(AuthHandler.INSTANCE);
                ch.pipeline().addLast(MessageRequestHandler.INSTANCE);
                ch.pipeline().addLast(CreateGroupRequestHandler.INSTANCE);
                ch.pipeline().addLast(JoinGroupRequestHandler.INSTANCE);
                ch.pipeline().addLast(QuitGroupRequestHandler.INSTANCE);
                ch.pipeline().addLast(ListGroupMembersRequestHandler.INSTANCE);
                ch.pipeline().addLast(GroupMessageRequestHandler.INSTANCE);
                ch.pipeline().addLast(LogoutRequestHandler.INSTANCE);
                ch.pipeline().addLast(new PacketEncoder());
            }
        });
initChannel 单例改造

另外,需要注意的是,Splitter 不能被共享。虽然看起来我们的 Splitter 方法内也没有引用任何成员变量,但也许是因为每个连接都要维护自己的 ByteBuf,因此 Splitter 继承了 拆包器-LengthFieldBasedFrameDecoder 之后由于父类的有状态而导致 Splitter 也有状态了。如果你不信,可以强行试试把 Splitter 改造成单例。最后你会发现,控制台会输出一个错误。debug 后你会看到在 Splitter 某个父类中的构造器是这样的:

    protected ByteToMessageDecoder() {
        ensureNotSharable();
    }

这已经在告诉你不能把 ByteToMessageDecoder 和 它的派生子类设为共享 handler。我的 netty 版本用的是 4.1.24.final,而在这之前的一些版本中 ensureNotSharable() 方法还并不是在 ChannelHandler 继承体系中的一个方法,是用了某种 Util 工具来存放这个方法。不过重点是,我们知道运行起来效果是一样的。

2. 压缩 handler

 2.1 合并编解码器

Netty 内部提供了一个类,叫做 MessageToMessageCodec,使用它可以让我们的编解码操作放到一个类里面去实现。并且这个 codec 也是可以共享的。详情见代码 UltimatePacketCodecHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/UltimatePacketCodecHandler.java

 1 @ChannelHandler.Sharable
 2 public class UltimatePacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {
 3     public static final UltimatePacketCodecHandler INSTANCE = new UltimatePacketCodecHandler();
 4 
 5     private UltimatePacketCodecHandler() {}
 6 
 7     @Override
 8     protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
 9         // 使用 channel 上的 ByteBuf alloc,方便 netty 管理内存
10         ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
11         out.add(PacketCodec.INSTANCE.encode(byteBuf, packet));
12     }
13 
14     @Override
15     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
16         out.add(PacketCodec.INSTANCE.decode(buf));
17     }
18 }

2.2 合并平行 handler——缩短事件传播路径

对我们的 IM 应用来说,每次从控制台(对应一个UI按钮)只会传一个指令到服务器,并且这个指令只会被某一个 handler 处理,因此这些指令 handler 有一个“平行”的概念。我们可以将这些平行的 handler 压缩为一个 handler,如 IMRequestHandler 所示:

 1 @ChannelHandler.Sharable
 2 public class IMRequestHandler extends SimpleChannelInboundHandler<Packet> {
 3 
 4     public static final IMRequestHandler INSTANCE = new IMRequestHandler();
 5 
 6     private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> channelMap;
 7 
 8     private IMRequestHandler() {
 9         channelMap = new HashMap<>();
10         // 将指令类型 和 request handler 做映射
11         channelMap.put(Command.MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
12         channelMap.put(Command.LOGIN_REQUEST, LoginRequestHandler.INSTANCE);
13         channelMap.put(Command.LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
14         channelMap.put(Command.CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
15         channelMap.put(Command.JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
16         channelMap.put(Command.LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
17         channelMap.put(Command.QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
18         channelMap.put(Command.GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
19     }
20 
21     @Override
22     protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
23         SimpleChannelInboundHandler<? extends Packet> simpleChannelInboundHandler = channelMap.get(msg.getCommand());
24         if (simpleChannelInboundHandler != null) {
25             // 只关心能处理的类型
26             simpleChannelInboundHandler.channelRead(ctx, msg);
27         }
28     }
29 }

 

再看看代码中的 IMResponseHandler:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/client/handler/IMResponseHandler.java

其实客户端没有必要进行这种程度的优化,不过可以再次感受一下 netty 给我们带来的编码上的方便。

3. 减少阻塞主线程的操作

通常我们的应用会涉及数据库或网络操作,比如在 LoginRequestHandler 中,实际上在 valid() 或 checkUser() 方法中做的事情是把用户名和密码拿到数据库或某个网络中间件里面去进行比较,而例子中我只是粗暴验证直接返回 true 并简单的生成一个 userId 返回了。实际场景如下:

1 protected void channelRead0(ChannelHandlerContext ctx, T packet) {
2     // 1. balabala 一些逻辑
3     // 2. 数据库或者网络等一些耗时的操作
4     // 3. writeAndFlush()
5     // 4. balabala 其他的逻辑
6 }

对于第2个过程中的耗时操作,通常不会直接这样写。为什么?先来看看 netty 一条 NIO 线程的处理逻辑抽象:

1 List<Channel> channelList = 已有数据可读的 channel
2 for (Channel channel in channelist) {
3    for (ChannelHandler handler in channel.pipeline()) {
4        handler.channelRead0(ctx, msg);
5    } 
6 }

当我们执行 NioEventLoopGroup worker = new NioEventLoopGroup(); 这行代码时,netty 默认会启动 2倍 CPU 核数的 NIO 线程,在单机大量连接(几万甚至十几万以上)情况下, 一条 NIO 线程管理着几千条甚至上万条连接。如果在某个连接上执行 channelRead0() 时发生阻塞,最终都会拖慢绑定在该 NIO 线程上的其他 channel 的执行速度。

这时我们应该把耗时操作扔到业务线程池中去处理,处理逻辑如 LoginRequestHandler.java 中代码所示:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/server/handler/LoginRequestHandler.java,伪代码如下:

 1 ThreadPool threadPool = xxx;
 2 
 3 protected void channelRead0(ChannelHandlerContext ctx, T packet) {
 4     threadPool.submit(new Runnable() {
 5         // 1. balabala 一些逻辑
 6         // 2. 数据库或者网络等一些耗时的操作
 7         // 3. writeAndFlush()
 8         // 4. balabala 其他的逻辑
 9     });
10 }

 

最后,其他小细节就不在本篇里长篇大论了,以后应该会收集一个系列来专门记录编程里的小技巧。很多功能也没有在这个版本里一并实现,比如消息的存储,需要加上数据库。以及“模拟打开聊天窗口”时查看最近的一些消息等。参考 QQ 最近这几年的变化,打开聊天窗口加载的消息数量变少了,如果有关注的话应该会对这个变化有印象,之前的一些版本中打开窗口就能看到之前聊过的十几行消息,后面慢慢变成几行,目前(2019-11-07)打开窗口只能看三行了。只要有时间,这些功能都是可以添加的。比如消息存储和历史消息这块,先有消息存储后,后续就可以做一个 7天内、3天内的、当天的N条 等不同级别的历史消息缓存级别。后面有空我也会继续持续完善这个 IM 系统的功能,毕竟这是我兴趣的项目之一。

有缘下篇博客见,See you~~

 

标签:netty,pipeline,ch,addLast,即时通讯,INSTANCE,IM,new
来源: https://www.cnblogs.com/christmad/p/11804860.html