编程语言
首页 > 编程语言> > netty源码分析之AbstractNioByteChannel.NioByteUnsafe.read()

netty源码分析之AbstractNioByteChannel.NioByteUnsafe.read()

作者:互联网

 1      @Override
 2         public final void read() {
 3             final ChannelConfig config = config();
 4             if (shouldBreakReadReady(config)) {
 5                 clearReadPending();
 6                 return;
 7             }
 8             final ChannelPipeline pipeline = pipeline();
 9             final ByteBufAllocator allocator = config.getAllocator();
10             //获取自适应缓冲区分配器对象(第一次进来才会创建)
11             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
12             //重置分配器对象    
13             allocHandle.reset(config);
14 
15             ByteBuf byteBuf = null;
16             boolean close = false;
17             try {
18                 do {
19                     //通过分配器分配默认大小为1024的ByteBuf(其大小会自适应调整,具体变化规则下面解析)
20                     byteBuf = allocHandle.allocate(allocator);
21                     //1.doReadBytes(byteBuf):先根据byteBuf大小来设置attemptedBytesRead属性(在while判断中会用到此属性),然后byteBuf尝试读取最大为attemptedBytesRead的数据
22                     //2.allocHandle.lastBytesRead():记录lastBytesRead和totalBytesRead,这里可能会触发一次自适应调整
23                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
24                     if (allocHandle.lastBytesRead() <= 0) {
25                         // nothing was read. release the buffer.
26                         byteBuf.release();
27                         byteBuf = null;
28                         close = allocHandle.lastBytesRead() < 0;
29                         if (close) {
30                             // There is nothing left to read as we received an EOF.
31                             readPending = false;
32                         }
33                         break;
34                     }
35 
36                     //增加已经读取消息的次数
37                     allocHandle.incMessagesRead(1);
38                     readPending = false;
39                     //将已经读取到的数据抛给处理器链的channelRead处理(正常业务逻辑在这里处理)
40                     pipeline.fireChannelRead(byteBuf);
41                     byteBuf = null;
42                 } while (allocHandle.continueReading()); //判断是否需要继续读取数据
43 
44                 //触发一次自适应调整
45                 allocHandle.readComplete();
46                 //读取完成后触发处理器链的channelReadComplete
47                 pipeline.fireChannelReadComplete();
48 
49                 if (close) {
50                     closeOnRead(pipeline);
51                 }
52             } catch (Throwable t) {
53                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
54             } finally {
55                 // Check if there is a readPending which was not processed yet.
56                 // This could be for two reasons:
57                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
58                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
59                 //
60                 // See https://github.com/netty/netty/issues/2254
61                 if (!readPending && !config.isAutoRead()) {
62                     removeReadOp();
63                 }
64             }
65         }    

 

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            //为空创建
            if (recvHandle == null) {
                //这里创建的是AdaptiveRecvByteBufAllocator.HandleImpl实例
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }

 

public void reset(ChannelConfig config) {
            this.config = config;
            //默认16
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

 

public ByteBuf allocate(ByteBufAllocator alloc) {
            //创建一个由AdaptiveRecvByteBufAllocator.HandleImpl推测的容量大小的ByteBuf
            return alloc.ioBuffer(guess());
        }

public int guess() {
            //这里返回的是AdaptiveRecvByteBufAllocator.HandleImpl里推测容量,自适应调整变化的就是此值的大小
            return nextReceiveBufferSize;
        }

public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            //分配直接内存(堆外内存)
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

 

 1     protected int doReadBytes(ByteBuf byteBuf) throws Exception {
 2         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 3         //设置attemptedBytesRead属性,大小为byteBuf的可写大小
 4         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
 5         //尝试读取最大为attemptedBytesRead的数据
 6         return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
 7     }
 8 
 9       AdaptiveRecvByteBufAllocator.HandleImpl
10       public void lastBytesRead(int bytes) {
11             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
12             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
13             // the selector to check for more data. Going back to the selector can add significant latency for large
14             // data transfers.
15             //判断当前读取到的数据与推测大小是否一致,一致的话进行一次扩容处理
16             if (bytes == attemptedBytesRead()) {
17                 record(bytes);
18             }
19             //调用父类的lastBytesRead方法
20             super.lastBytesRead(bytes);
21         }
22 
23         private void record(int actualReadBytes) {
24             //SIZE_TABLE里保存着有序的递增的16-1073741824(到512前,每次递增16;512后每次*2)
25             //INDEX_DECREMENT 默认 1, INDEX_INCREMENT 默认 4
26             //真实读取到数据的大小小于等于SIZE_TABLE索引前两位的值大小时,第一次触发不会缩容,第二次触发会缩容为SIZE_TABLE索引前一位的值(若index - 1 < minIndex,index值为minIndex)
27             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
28                 if (decreaseNow) {
29                     index = max(index - INDEX_DECREMENT, minIndex);
30                     nextReceiveBufferSize = SIZE_TABLE[index];
31                     decreaseNow = false;
32                 } else {
33                     //第一次触发不会缩容
34                     decreaseNow = true;
35                 }
36             } else if (actualReadBytes >= nextReceiveBufferSize) { //真实读取到数据大于等于推测大小时,直接扩容为SIZE_TABLE索引后四位的值(若index + 4 > maxIndex,index值为maxIndex)
37                 index = min(index + INDEX_INCREMENT, maxIndex);
38                 nextReceiveBufferSize = SIZE_TABLE[index];
39                 decreaseNow = false;
40             }
41         }
42 
43        DefaultMaxMessagesRecvByteBufAllocator
44        public void lastBytesRead(int bytes) {
45             //将读取到的数据赋值给lastBytesRead
46             lastBytesRead = bytes;
47             if (bytes > 0) {
48                 //将读取到的数据叠加到totalBytesRead 
49                 totalBytesRead += bytes;
50             }
51         }

 

        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

    DefaultChannelConfig
    public boolean isAutoRead() {
        //autoRead默认为1,因此该判断为true
        return autoRead == 1;
    }

            DefaultMaxMessagesRecvByteBufAllocator
            public boolean get() {
                //当本次读操作读取到的字节数与AdaptiveRecvByteBufAllocator推测出的ByteBuf容量大小不一样时,就会返回false;否则返回true。如果本次读操作可读取的字节大于了attemptedBytesRead的话,一次读操作也只会先读取attemptedBytesRead的字节数
                return attemptedBytesRead == lastBytesRead;
            }

    //totalMessages < maxMessagePerRead:根据上面的流程我们可以知道,maxMessagePerRead为16,totalMessages为读循环已经执行的读操作次数(即,循环次数)。

    //totalBytesRead > 0:当本次读操作有读取到字节数时,或者以读取到的字节数小于Integer.MAX_VALUE,那么该判断都会大于0,即,为true;否则为false。

 

        public void readComplete() {
            //读结束后,触发一次自适应调整
            record(totalBytesRead());
        }

        protected final int totalBytesRead() {
            //本次已读取数据总和
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }

 

扩展:ByteToMessageDecoder.channelRead()

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                //判断累加区是否为空
                first = cumulation == null;
                if (first) {
                    //为空,直接将字节容器的指针指向新读取的数据
                    cumulation = data;
                } else {
                    //不为空,调用累加器累加数据。可能会触发一次扩容
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //数据传递给业务
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                //没有累加区中有数据可读时,清空
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                //如果连续16次(discardAfterReads的默认值),累加区中仍然有未被业务拆包器读取的数据,那就做一次压缩,有效数据段整体移到容器首部(粘包后拆包时会存在此情况)
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    //压缩,即丢弃已读数据
                    discardSomeReadBytes();
                }

                //传递业务数据包给业务解码器处理(拆包时,在callDecode()里就会交给业务解码器处理了,这边通常是处理的是最后一次拆包,即数据只有一个完整包时,直接走这边)
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

 

private Cumulator cumulator = MERGE_CUMULATOR;

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            try {
                final ByteBuf buffer;
                //容器不够本次追加
                if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain() or if its read-only.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    //追加扩容
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
                } else {
                    buffer = cumulation;
                }
                //将新数据累加到字节容器中
                buffer.writeBytes(in);
                return buffer;
            } finally {
                // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
                // for whatever release (for example because of OutOfMemoryError)
                in.release();
            }
        }
    };

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        ByteBuf oldCumulation = cumulation;
        //扩容也是一个内存拷贝操作,新增的大小即是新读取数据的大小
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        oldCumulation.release();
        return cumulation;
    }

 

/**
     * Called once data should be decoded from the given {@link ByteBuf}. This method will call
     * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     */
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                //若有已经拆出来的数据包,交给后面的处理器链处理
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                //记录字节容器中有多少字节
                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    //拆包器未读取任何数据
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                //拆包器未读取任何数据,已经解到了数据包
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            //待实现的业务处理逻辑
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

 

标签:netty,读取,read,ctx,源码,ByteBuf,cumulation,public,out
来源: https://www.cnblogs.com/gumanlou/p/16603043.html