Netty源码分析-ReplayingDecoder
作者:互联网
ReplayingDecoder可以重复解码的解码器,此类的核心原理是内部包含了一个ReplayingDecoderByteBuf,当读取字节不够时则抛出异常,ReplayingDecoder捕获异常还原读取readerIndex然后等待Netty下一次事件继续读取。
ReplayingDecoderByteBuf集成了Bytebuf,它代理了ByteBuf当中读取的方法。
final class ReplayingDecoderByteBuf extends ByteBuf
我们拿readInt来分析,在真正读取之前先检查可读取字节长度checkReadableBytes,如果不够读的则抛出异常。
@Override
public int readInt() {
checkReadableBytes(4);
return buffer.readInt();
}
private void checkReadableBytes(int readableBytes) {
if (buffer.readableBytes() < readableBytes) {
throw REPLAY;
}
}
callDecode方法会将Bytebuf包装成ReplayingDecoderByteBuf,子类解码时如果字节流长度不够则抛出异常,捕获异常后还原Bytebuf的readerIndex位置然后等待Netty下次事件回调。
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//包装ByteBuf到内部
replayable.setCumulation(in);
try {
//如果in可读
while (in.isReadable()) {
//记录in的读取下标位置
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size();
//如果存在已解码的对象则fire到下一个handler
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
S oldState = state;
//可读字节数量
int oldInputLength = in.readableBytes();
try {
//调用解码逻辑-由子类实现
//由于传入了replayable对象,在子类解码实现中读取字节不够则抛出Signal异常
decodeRemovalReentryProtection(ctx, replayable, out);
if (ctx.isRemoved()) {
break;
}
//outSize == out.size()说明子类解码没解析出数据
if (outSize == out.size()) {
//oldInputLength == in.readableBytes()说明字节流有变化并且oldState == state则抛出异常
if (oldInputLength == in.readableBytes() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
"data or change its state if it did not decode anything.");
} else {
// Previous data has been discarded or caused state transition.
// Probably it is reading on.
continue;
}
}
} catch (Signal replay) {
replay.expect(REPLAY);
if (ctx.isRemoved()) {
break;
}
//如果解码时出现异常,说明in的字节不够读取
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
//还原in的读取位置
in.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
break;
}
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
"or change its state if it decoded something.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
ReplayingDecoder和ByteToMessageDecoder之间的最大区别在于ReplayingDecoder允许您实现decode()和decodeLast()方法,就像已经接收到所有必需字节一样,而不是检查所需字节的可用性。 例如,以下ByteToMessageDecoder实现:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
//检查是否有4个字节,没有则返回等待下次netty事件
if (buf.readableBytes() < 4) {
return;
}
//标记读取索引
buf.markReaderIndex();
//读取4个字节返回Int数字代表包的长度
int length = buf.readInt();
//剩余字节如果不够length
if (buf.readableBytes() < length) {
//还原读取索引等待下次netty事件
buf.resetReaderIndex();
return;
}
//读取length个字节放入解码器
out.add(buf.readBytes(length));
}
}
简化实现
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
错误的做法
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
//错误的做法,因为第一次offer可能成功,第二次失败,然后buf被还原,netty下次事件
//导致消息又重新读了2次,那么队列中会有3个对象。
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
正确的做法
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
//每次清空变量
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
性能提升技巧,设置还原点,通过checkpoint方法,获取bytebuf的读取索引位置,当需要还原时只还原到checkpoint的位置,而不是还原到开始位置。
/**
* Stores the internal cumulative buffer's reader position.
*/
protected void checkpoint() {
checkpoint = internalBuffer().readerIndex();
}
/**
* Stores the internal cumulative buffer's reader position and updates
* the current decoder state.
*/
protected void checkpoint(S state) {
checkpoint();
state(state);
}
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
//包的长度
private int length;
//设置初始状态
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
//读取包长状态
case READ_LENGTH:
//读取包长-失败出异常等待Netty下次事件重新读
length = buf.readInt();
//读取成功设置还原点并更新状态
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
//读取length长度的包,如果字节不够抛出异常,还原到上面设置的还原点,避免每次都读取//buf.readInt();
//读取成功设置还原点,更新状态
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
上面的例子,length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT);
读取readInt成功后,就设置还原点,接着继续读取buf.readBytes(length);,如果这时字节长度不够,就会抛出异,系统会还原ByteBuf到上次的还原点,等待下次Netty事件,避免了系统反复调用length = buf.readInt();
标签:Netty,读取,readInt,state,源码,values,ReplayingDecoder,buf,out 来源: https://blog.51cto.com/u_11868971/2997563