其他分享
首页 > 其他分享> > netty解码器分析和记录

netty解码器分析和记录

作者:互联网

目录

工作中使用netty,主要是扩展ChannelHandler,其中避免不了要对约定的协议进行编解码,因此记录下。

netty半包处理器分析

解码就是把二进制流转换为业务数据的过程,即把byte[]转换为我们看得懂的业务数据。netty提供的了ByteToMessageDecoder这个半包处理器,我们需要重写

decode即可,但是由于tcp粘包和拆包问题,首先需要把接收到的数据拆为一个完整的包,再进行解码。

在应用层将 byte[] 转为 message 的难度在于如何确定当前的包是一个完整的数据包,有两种方案可以实现:

1.监听当前 socket 的线程一直等待,直到收到的 byte 可以完成的构成一个包为止。这种方式的弊端就在于要浪费一个线程去等。

2.第二种方案是为每个监听的 socket 都构建一个本地缓存,当前监听线程如果遇到字节数不够的情况就先将获取到的数据存入缓存,继而处理别的请求,等到这里有数据的时候再来将新数据继续写入缓存直到数据构成一个完整的包取出。

ByteToMessageDecoder 采用的是第二种方案。在 ByteToMessageDecoder 中有一个对象 ByteBuf,该对象用于存储当前 Decoder接收到的 byte 数据。

先看下ByteToMessageDecoder 代码,核心就在这里,如何拆解出一个完整包

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    ByteBuf cumulation;
    private Cumulator cumulator = MERGE_CUMULATOR;
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
    private int discardAfterReads = 16;
    private int numReads;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//入参msg是ByteBuf,channel读取到的字节流保存到缓冲区msg
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();//认为就是个list,保存读取到的多个完整的二进制流数据
            try {
                ByteBuf data = (ByteBuf) msg;//二进制流
                //判断cumulation == null;并将结果赋值给first。因此如果first为true,则表示第一次接受到数据
                first = cumulation == null;
                if (first) {
                    cumulation = data;//如果是第一次接受到数据,直接将接受到的数据赋值给缓存对象cumulation
                } else {
                    // 非第一次解码,就将 data 向 cumulation 追加,并释放 data
         			 //如果cumulation中的剩余空间,不足以存储接收到的data,将cumulation扩容
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 得到追加后的 cumulation 后,调用 decode 方法进行解码
				// 解码过程中,调用 fireChannelRead 方法,主要目的是将累积区的内容 decode 到 数组out中
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //如果cumulation没有数据可读了,说明所有的二进制数据都被解析过了
                //此时对cumulation进行释放,以节省内存空间。
                //反之cumulation还有数据可读,那么if中的语句不会运行,因为不对cumulation进行释放
                //因此也就缓存了用户尚未解析的二进制数据。
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;// 将次数归零
                    cumulation.release();// 释放累计区
                    cumulation = null;// 加速 ygc
                } else if (++ numReads >= discardAfterReads) {
                    // 如果超过了 16 次,就压缩累计区,主要是将已经读过的数据丢弃,将 readIndex 归零。
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                // 如果没有向数组插入过任何数据
                decodeWasNull = !out.insertSinceRecycled();
                // 循环数组,向后面的 handler 发送数据,如果数组是空,那不会调用
                fireChannelRead(ctx, out, size);
                // 将数组中的内容清空,将数组的数组的下标恢复至原来
                out.recycle();
            }
        } else {
            //如果msg类型是不是ByteBuf,直接调用下一个handler进行处理
            ctx.fireChannelRead(msg);
        }
    }
    
    //callDecode方法主要用于解析cumulation 中的数据,并将解析的结果放入List<Object> out中。
  //由于cumulation中缓存的二进制数据,可能包含了出多条有效信息,因此在callDecode方法中,默认会调用多次decode方法
  //我们在覆写decode方法时,每次只解析一个消息,添加到out中,callDecode通过多次回调decode
  //每次传递进来都是相同的List<Object> out实例,因此每一次解析出来的消息,都存储在同一个out实例中。
  //当cumulation没有数据可以继续读,或者某次调用decode方法后,List<Object> out中元素个数没有变化,则停止回调decode方法。
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            //如果cumulation中有数据可读的话,一直循环调用decode
            while (in.isReadable()) {
                //获取上一次decode方法调用后,out中元素数量,如果是第一次调用,则为0。
                int outSize = out.size();

                if (outSize > 0) {
                     //用后面的业务 handler 的 ChannelRead 方法读取解析的数据
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                 //回调decode方法,由开发者覆写,用于解析in中包含的二进制数据,并将解析结果放到out中。
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
                
				//outSize是上一次decode方法调用时out的大小,out.size()是当前out大小
        		//如果二者相等,则说明当前decode方法调用没有解析出有效信息。
                if (outSize == out.size()) {
                    //此时,如果发现上次decode方法和本次decode方法调用候,in中的剩余可读字节数相同
          			//则说明本次decode方法没有读取任何数据解析
          			//(可能是遇到半包等问题,即剩余的二进制数据不足以构成一条消息),跳出while循环。
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                //处理人为失误 。如果走到这段代码,则说明outSize != out.size()。
                //也就是本次decode方法实际上是解析出来了有效信息放到out中。
                //但是oldInputLength == in.readableBytes(),说明本次decode方法调用并没有读取任何数据
                //但是out中元素却添加了。
                //这可能是因为开发者错误的编写了代码,例如mock了一个消息放到List中。
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
}

我们以FixedLengthFrameDecoder为例查看如何解析处一个完整的报文

image-20220201233338651

这里 channelRead()的主要逻辑是:

1.从对象池中取出一个空的数组;
2.判断成员变量是否是第一次使用,要注意的是,这里既然使用了成员变量,所以这个 handler 不能是 @Shareble 状态的 handler,不然你就分不清成员变量是哪个 channel 的。将 unsafe 中传递来的数据写入到这个 cumulation 累积区中
3.写到累积区后,调用子类的 decode 方法,尝试将累积区的内容解码,每成功解码一个,就调用后面节点的 channelRead 方法。若没有解码成功,什么都不做;
4.如果累积区没有未读数据了,就释放累积区;
5.如果还有未读数据,且解码超过了 16 次(默认),就对累积区进行压缩。将读取过的数据清空,也就是将 readIndex 设置为0;
6.设置 decodeWasNull 的值,如果上一次没有插入任何数据,这个值就是 ture。该值在 调用 channelReadComplete 方法的时候,会触发 read 方法(不是自动读取的话),尝试从 JDK 的通道中读取数据,并将之前的逻辑重来。主要应该是怕如果什么数据都没有插入,就执行 channelReadComplete 会遗漏数据;
7.调用 fireChannelRead 方法,将数组中的元素发送到后面的 handler 中;
8.将数组清空。并还给对象池。

当数据添加到累积区之后,需要调用 decode 方法进行解码,代码见上面的 callDecode()方法。在 callDecode()中最关键的代码就是将解析完的数据拿取调用decode(ctx, in, out)方法。所以如果继承 ByteToMessageDecoder 类实现自己的字节流转对象的逻辑我们就要覆写该方法。

netty验证demo如下

服务端

image-20220202001959839

验证粘包和拆包

客户端

image-20220202002158975

通过验证,发现针对粘包和拆包,netty会把读取到的数据缓存到Channel的缓冲区,即io.netty.handler.codec.ByteToMessageDecoder#cumulation

比如第一次发送:12345678901234567890abc 总计23字节,netty服务端读取到23字节,但是只截取20字节(FixedLengthFrameDecoder截取)作为一个完整包,触发pipeline的channelRead。

接着第二次发送:11111111111111111 总计17字节,netty服务端读取到17字节,累积到缓冲区cumulation,加上上次的3字节,总计20字节,是一个完整包,接着又触发pipeline的channelRead。

从FixedLengthFrameDecoder可以看出我们针对ByteToMessageDecoder扩展的通用方法,是从输入缓冲区读取二进制流,追加到Channel缓冲区cumulation,然后对缓冲区cumulation进行截取,截取出一个完整的包,然后把包解码为业务数据。接着继续循环对累计缓冲区cumulation进行截取,如果不足一个完整包,pipeline就不向后传递。对我们来说,从累计缓冲区cumulation截取出的一个完整包,要保存到入参List<Object> out

验证客户端一个字节一个字节写

以前tomcat只有bio的时候,是一个accept线程接入,然后把socket提交到业务线程池,由业务线程进行读取输入流、解码、业务处理、编码、输出。如果客户端写一个字节后,休眠5s,然后接着再写一个字节,那么一个客户端开200的这样线程,就可以把服务端线程全部刷满,使用netty后,如果解决这个问题的呢?还是累积缓冲区io.netty.handler.codec.ByteToMessageDecoder#cumulation,写一个字节,触发read事件,接着这单个字节被追加到缓冲区cumulation,由于不能截取为一个完整的报文(比如FixedLengthFrameDecoder),因此pipeline不触发向后传播channelRead,直到客户端这样耍赖,一直写完一个完整包为止。这样对服务端有什么影响吗?没有,因为是selector IO读取是非阻塞,不会导致IO线程阻塞,因此没有影响。

测试的客户的例子如下

image-20220202004348724

demo代码在 github branch-netty分支。

netty解码总结

先确定获取都一个完整包(采用netty现有的或者继承ByteToMessageDecoder重写),然后触发pipeline进行解码,解码动作我们自己实现(只继承inbound适配器即可,不需要继承ByteToMessageDecoder )

netty Channel累积缓冲区cumulation

每个Channle都有个与之对应的Pipeline,如果pipeline有继承了ByteToMessageDecoder的解码器,那么此Channle就有个累积缓冲区cumulation,累积缓冲区是什么时候清空的呢? 最大允许多大呢?

累积缓冲区cumulation在不可读的情况下,会被清空,不可读,说明报文是(多个)完整的且已经被解码,即报文都被解码后,累积缓冲区被清空。

累加缓冲区cumulation最大可以多大呢?从ByteToMessageDecoder内看不出来。

为什么netty不适合传输文件

netty处理read事件: 由IO线程自旋,处理注册到Selector上的通道的事件。NioEventLoop#run -> NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) -> io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

@Override
public final void read() {
    //省略
    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
        int totalReadAmount = 0;
        boolean readPendingReset = false;
        do {
            byteBuf = allocHandle.allocate(allocator);//创建缓冲区
            int writable = byteBuf.writableBytes();
            int localReadAmount = doReadBytes(byteBuf);//把socket数据写入到缓冲区
            //省略
            pipeline.fireChannelRead(byteBuf);//触发pipeline的channelRead
            byteBuf = null;

            //省略
        } while (++ messages < maxMessagesPerRead);//最大连续读取16次

        pipeline.fireChannelReadComplete();//触发pipeline的channelReadComplete
        //省略
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close);
    } finally {
        //省略
    }
}

原因有如下两处:

1.由于EventLoop是串行执行,如果文件过大,那么在读取输入流的时候,在这里循环16次才结束,如果大多数channel都这样,严重会导致其他channel的read事件处理阻塞。

2.每次读取后,触发pipeline的channelRead,把读取到的数据保存到累加缓冲区io.netty.handler.codec.ByteToMessageDecoder#cumulation,这样会导致累加缓冲区使用内存过大。

为什么dubbo不适合传输问题件

dubbo默认报文最大是8m,为什么这样设置,官方文档为什么不建议传输文件或者大报文呢?

原因如下:

1.dubbo采用的序列化,需要一次性把对象序列化,如果对象很大(包含文件),那么一次性序列化对象并加载到内存,会导致oom。

2.dubbo默认使用的netty,且是单连接模型,一个客户端对一个服务端只有一个tcp连接,netty消息发送也是采用的串行无锁化,如果一个对象比较大,会导致这个channel一直在发送(write&flush),从而导致其他channel的发送排队阻塞(在taskQueue内排队无法及时处理),迟迟无法发送出去,最终会导致客户端其它的consumer等待超时。

3.大对象也会导致dubbo服务端读取,从而导致阻塞其它channel的数据读取。 和2的原因一样,netty使用的串行无锁化,如果一个任务执行过程,就会到其它任务阻塞。

既然单连接模型有这个缺点,为什么dubbo还要采用呢?因为省资源,tcp资源是宝贵的,且单连接可以满足rpc的场景;rpc的场景通常是服务端少,客户端很多,如果为每个客户端都创建tcp连接,会导致tcp连接数爆满,因为不知道具体有多少个客户端会调用服务端。

dubbo也是可以使用多连接模型的,在客户端发送时,采用轮询机制从tcp连接池内选择一个tcp连接进行发送。但是通常不建议这样。

因此:dubbo仅适合业务小报文传输(默认小于8m),不适合大报文和文件传输。

为什么http适合传输文件

http适合传输文件需要从客户端和服务端来说

客户端

使用httpclient等工具传输文件,比如要传输的文件在本地,每次只读取文件的一个 Buffer 大小,然后将这个 Buffer 的数据使用 Socket 发送即可;在这种方式下,同时存在于内存中的数据,只会有一个 Buffer 大小,不会有 Dubbo 那样将全部数据读取至内存的问题。

服务端

针对web服务容器是tomcat来说,Tomcat 在读取文件报文(form-data)时,会先将报文暂存至磁盘,然后通过 FileItem 读取磁盘中的报文内容。所以在对于 Server 端来说,不会一次性将完整的报文数据读取至内存中,也就不会有内存占用过高的问题。

因此平时的上传文件,都是使用的客户端直连oss上传,不经过中间系统,这样也不会导致我们系统的连接阻塞、带宽占用等问题。

标签:netty,读取,记录,decode,解码器,缓冲区,cumulation,out
来源: https://www.cnblogs.com/zhangyjblogs/p/15860653.html