编程语言
首页 > 编程语言> > Netty源码分析-ReplayingDecoder

Netty源码分析-ReplayingDecoder

作者:互联网

ReplayingDecoder可以重复解码的解码器,此类的核心原理是内部包含了一个ReplayingDecoderByteBuf,当读取字节不够时则抛出异常,ReplayingDecoder捕获异常还原读取readerIndex然后等待Netty下一次事件继续读取。

 

ReplayingDecoderByteBuf集成了Bytebuf,它代理了ByteBuf当中读取的方法。

final class ReplayingDecoderByteBuf extends ByteBuf

我们拿readInt来分析,在真正读取之前先检查可读取字节长度checkReadableBytes,如果不够读的则抛出异常。

@Override
public int readInt() {
    checkReadableBytes(4);
    return buffer.readInt();
}

private void checkReadableBytes(int readableBytes) {
    if (buffer.readableBytes() < readableBytes) {
        throw REPLAY;
    }
}

callDecode方法会将Bytebuf包装成ReplayingDecoderByteBuf,子类解码时如果字节流长度不够则抛出异常,捕获异常后还原Bytebuf的readerIndex位置然后等待Netty下次事件回调。

@Override
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
		//包装ByteBuf到内部
        replayable.setCumulation(in);
        try {
			//如果in可读
            while (in.isReadable()) {
				//记录in的读取下标位置
                int oldReaderIndex = checkpoint = in.readerIndex();
                int outSize = out.size();
				
				//如果存在已解码的对象则fire到下一个handler
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
				
                S oldState = state;
				//可读字节数量
                int oldInputLength = in.readableBytes();
                try {
					//调用解码逻辑-由子类实现
					//由于传入了replayable对象,在子类解码实现中读取字节不够则抛出Signal异常
                    decodeRemovalReentryProtection(ctx, replayable, out);

                    if (ctx.isRemoved()) {
                        break;
                    }
					
					//outSize == out.size()说明子类解码没解析出数据
                    if (outSize == out.size()) {
						//oldInputLength == in.readableBytes()说明字节流有变化并且oldState == state则抛出异常
                        if (oldInputLength == in.readableBytes() && oldState == state) {
                            throw new DecoderException(
                                    StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
                                    "data or change its state if it did not decode anything.");
                        } else {
                            // Previous data has been discarded or caused state transition.
                            // Probably it is reading on.
                            continue;
                        }
                    }
                } catch (Signal replay) {
                    replay.expect(REPLAY);
                    if (ctx.isRemoved()) {
                        break;
                    }

					//如果解码时出现异常,说明in的字节不够读取
                    int checkpoint = this.checkpoint;
                    if (checkpoint >= 0) {
						//还原in的读取位置
                        in.readerIndex(checkpoint);
                    } else {
                        // Called by cleanup() - no need to maintain the readerIndex
                        // anymore because the buffer has been released already.
                    }
                    break;
                }

                if (oldReaderIndex == in.readerIndex() && oldState == state) {
                    throw new DecoderException(
                           StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
                           "or change its state if it decoded something.");
                }
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

 

ReplayingDecoder和ByteToMessageDecoder之间的最大区别在于ReplayingDecoder允许您实现decode()和decodeLast()方法,就像已经接收到所有必需字节一样,而不是检查所需字节的可用性。 例如,以下ByteToMessageDecoder实现:

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {

    @Override
   protected void decode(ChannelHandlerContext ctx,
                           ByteBuf buf, List<Object> out) throws Exception {
    
    //检查是否有4个字节,没有则返回等待下次netty事件
     if (buf.readableBytes() < 4) {
        return;
     }
    //标记读取索引
     buf.markReaderIndex();
    //读取4个字节返回Int数字代表包的长度
     int length = buf.readInt();
    
    //剩余字节如果不够length
     if (buf.readableBytes() < length) {
    //还原读取索引等待下次netty事件
        buf.resetReaderIndex();
        return;
     }
    
    //读取length个字节放入解码器
     out.add(buf.readBytes(length));
   }
 }

简化实现

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
   protected void decode(ChannelHandlerContext ctx,
                           ByteBuf buf) throws Exception {
     out.add(buf.readBytes(buf.readInt()));
   }
}

 

错误的做法

public class MyDecoder extends ReplayingDecoder<Void> {

   private final Queue<Integer> values = new LinkedList<Integer>();

    @Override
   public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
    //错误的做法,因为第一次offer可能成功,第二次失败,然后buf被还原,netty下次事件
    //导致消息又重新读了2次,那么队列中会有3个对象。
     // A message contains 2 integers.
     values.offer(buf.readInt());
     values.offer(buf.readInt());

     // This assertion will fail intermittently since values.offer()
     // can be called more than two times!
     assert values.size() == 2;
     out.add(values.poll() + values.poll());
   }
 }

 

正确的做法

public class MyDecoder extends ReplayingDecoder<Void> {

   private final Queue<Integer> values = new LinkedList<Integer>();

    @Override
   public void decode(.., ByteBuf buf, List<Object> out) throws Exception {

     // Revert the state of the variable that might have been changed
     // since the last partial decode.
    //每次清空变量
     values.clear();

     // A message contains 2 integers.
     values.offer(buf.readInt());
     values.offer(buf.readInt());

     // Now we know this assertion will never fail.
     assert values.size() == 2;
     out.add(values.poll() + values.poll());
   }
 }

 

 

性能提升技巧,设置还原点,通过checkpoint方法,获取bytebuf的读取索引位置,当需要还原时只还原到checkpoint的位置,而不是还原到开始位置。

/**
 * Stores the internal cumulative buffer's reader position.
 */
protected void checkpoint() {
    checkpoint = internalBuffer().readerIndex();
}

/**
 * Stores the internal cumulative buffer's reader position and updates
 * the current decoder state.
 */
protected void checkpoint(S state) {
    checkpoint();
    state(state);
}
 public enum MyDecoderState {
   READ_LENGTH,
   READ_CONTENT;
 }

public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<MyDecoderState> {
    
//包的长度
   private int length;
    
    //设置初始状态
   public IntegerHeaderFrameDecoder() {
     // Set the initial state.
     super(MyDecoderState.READ_LENGTH);
   }
    

    @Override
   protected void decode(ChannelHandlerContext ctx,
                           ByteBuf buf, List<Object> out) throws Exception {

     switch (state()) {
    //读取包长状态
     case READ_LENGTH:
    //读取包长-失败出异常等待Netty下次事件重新读
       length = buf.readInt();
    //读取成功设置还原点并更新状态
       checkpoint(MyDecoderState.READ_CONTENT);
     case READ_CONTENT:
       //读取length长度的包,如果字节不够抛出异常,还原到上面设置的还原点,避免每次都读取//buf.readInt();
        //读取成功设置还原点,更新状态
       ByteBuf frame = buf.readBytes(length);
       checkpoint(MyDecoderState.READ_LENGTH);
       out.add(frame);
       break;
     default:
       throw new Error("Shouldn't reach here.");
     }
   }
 }

上面的例子,length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT);

读取readInt成功后,就设置还原点,接着继续读取buf.readBytes(length);,如果这时字节长度不够,就会抛出异,系统会还原ByteBuf到上次的还原点,等待下次Netty事件,避免了系统反复调用length = buf.readInt();

 

 

 

标签:Netty,读取,readInt,state,源码,values,ReplayingDecoder,buf,out
来源: https://blog.51cto.com/u_11868971/2997563