Netty编解码器源码分析(上)(详细分析在注释中说明)
作者:互联网
文章目录
解码定义:解码是指将二进制数据流转换成一个个bytebuf.
首先分析解码器顶层抽象类
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 可以看到是基于ByteBuf进行解码的
if (msg instanceof ByteBuf) {
// 把callldecode的解析的对象都放到out当中
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// 说明是第一次从io流里面读取数据
first = cumulation == null;
if (first) {
// 第一次把累加器赋值给刚读进来的对象
cumulation = data;
} else {
//不是第一次则将读进来的数据与cumulator进行累加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用子类的decode方法进行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
// 将解析出的bytebuf向下传播
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
//跟进 fireChannelRead
/*
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
**msgs.get(i)-->Bytebuf **
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}
*/
// 回收list(可以看到和之前的entry一样,采用了对象池的机制)
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
上面提到了comulation,这里我们讲下
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
// 扩容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 写入cumulation
buffer.writeBytes(in);
in.release();
return buffer;
}
};
进入到calldecode方法中进行分析:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
// 一直检测bytebuf是否有数据可读
// 记录一下outsize的大小
int outSize = out.size();
// 如果outsize的大小>0,说明已经有解析出的对象,则将事件向下传播
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
// 清空out
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 在decode之前记录一下可读长度
int oldInputLength = in.readableBytes();
// 进行解码
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
// 说明没有解析出对象
if (outSize == out.size()) {
// 说明本次读没有读取数据,当前累加的数据没有拼成一个完整的数据包
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 说明读到了数据,则有可能触发解析,所以continue,进行下次循环
continue;
}
}
// 走到这里说明解析出对象,但是没有从cumulation中读取数据,则报错
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
........
}
行解码器:以/r/n 或者/n结尾的字节流
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
/** Maximum length of a frame we're willing to decode. */
// 数据包最大长度
private final int maxLength;
/** Whether or not to throw an exception as soon as we exceed maxLength. */
//超过最大长度的时候是否立即抛出异常,如果为true,则抛出
private final boolean failFast;
// 最终解析出是否带换行符:true:不带换行符
private final boolean stripDelimiter;
/** True if we're discarding input because we're already over maxLength. */
// 丢弃模式
private boolean discarding;
// 解码到现在已经丢弃了多少字节
private int discardedBytes;
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
接下来我们就进入decode方法进行分析
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 找到这一行的结尾位置
final int eol = findEndOfLine(buffer);
//判断是否是丢失模式,第一次为false
if (!discarding) {
if (eol >= 0) {
final ByteBuf frame;
//计算换行符到可读字节之间的长度
final int length = eol - buffer.readerIndex();
// 拿到分隔符长度
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 超过最大长度
if (length > maxLength) {
//将readIndex移到换行符之后的位置
buffer.readerIndex(eol + delimLength);
// 传播异常
fail(ctx, length);
// 返回null 什么也没解析
return null;
}
// 判断是否要把分隔符也算在完整数据包下。
if (stripDelimiter) {
// 不包含分隔符
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
// 不包含分隔符
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else {
// 非丢弃模式下没有找到换行符
final int length = buffer.readableBytes();
if (length > maxLength) {
// 超过最大长度
// 将length长度丢弃
discardedBytes = length;
// 将读指针移动到写指针
buffer.readerIndex(buffer.writerIndex());
// 标记丢弃模式
discarding = true;
// 传播异常
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
// 啥也没解析到
return null;
}
}
// 进入丢弃模式
else {
// 找到endofline
if (eol >= 0) {
// 前面已经丢弃过的+这次要丢弃的
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 将读指针移到换行符之后第一个有效数据位置
buffer.readerIndex(eol + delimLength);
// 标记没有丢弃的数据了
discardedBytes = 0;
// 标记discarding为非丢弃
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
//没有找到endline,全部丢失,读指针与写指针置为相同
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
基于分隔符解码器分析
// 分隔符可以变化
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
this(maxFrameLength, true, delimiters);
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
// 由父类向下传播
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 受限判断是不是lineBasedDecoder,当指定delimiter为/r/n /n 时,在创建
// DelimiterBasedFrameDecoder的时候就会创建lineBasedDecoder
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
// 遍历分隔符找到其中一个分隔符划分最小数据包的长度,将此分隔符置为 minDelim,将 minFrameLength
// 置为该分割符划分的长度
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
// 找到分割符不为null
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
// 判断是否处于丢弃模式,第一次不会处于丢弃模式,
if (discardingTooLongFrame) {
// 处于丢弃模式下
// We've just finished discarding a very large frame.
// Go back to the initial state.
// 标记为非丢弃模式
discardingTooLongFrame = false;
//跳过这段数据包
buffer.skipBytes(minFrameLength + minDelimLength);
//this.tooLongFrameLength:丢弃的字节数
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
// 超过规定最大数据包长度
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
// 是否包含分隔符
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
//没有找到分隔符:所有可读字段全部丢弃
// 处于非丢弃模式
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// 标记丢弃字节长度
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
// 转换为丢弃模式
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
}
// 处于丢弃模式:已经丢弃+当前要丢弃的
else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
基于长度域解码器分析
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
// 下面介绍几个颇为重要的参数
private final ByteOrder byteOrder;
private final int maxFrameLength;
//指出包长度的偏移量
private final int lengthFieldOffset;
// 指出数据包长度
private final int lengthFieldLength;
private final int lengthFieldEndOffset;
// 后面数据包实际长度等于 lengthFieldLength+lengthAdjustment
private final int lengthAdjustment;
// 在最后解析的数据包中需要跳过多少字节
private final int initialBytesToStrip;
private final boolean failFast;
private boolean discardingTooLongFrame;
private long tooLongFrameLength;
private long bytesToDiscard;
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
下面介绍实际的逻辑,逻辑较为复杂,所以下面分为三个部分进行介绍
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 开启丢弃模式
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
// 计算现在能够丢弃的数据
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
// 通过failIfNecessary判断是否丢弃完了,然后从该方法中恢复非丢弃状态
failIfNecessary(false);
}
/*
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (bytesToDiscard == 0) {
// Reset to the initial state and tell the handlers that
// the frame was too large.
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
if (!failFast ||
failFast && firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
if (failFast && firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
}
}
*/
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
// 所以 lengthFieldEndOffset即为指向真正数据的前一个指针,
if (in.readableBytes() < lengthFieldEndOffset) {
// 说明此时还没有真正的数据可以读
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 表示从actualLengthFieldOffset起,读lengthFieldLength的value字节数,这个framelength
// 相当于lengthFieldLength中所表示的值
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
// 完整的数据包字节数=frame+前面指针值+lengthAdjustment
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
//(在这个逻辑处将会触发进入丢弃模式)
if (frameLength > maxFrameLength) {
// 计算可以丢弃的字节数
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
// 如果小于0,则代表frameLength小于可以读的数据,所以这次数据全部丢掉
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// discard>0,则代表这一次无法全部丢弃,保留仍待丢弃的数据,待下次在进行丢弃,并开启丢弃模式,丢弃字节数由bytesToDiscard保存,然后进入上面源码开始的地方,进行分析
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
// 什么也不做,因为可读的数据流小于我们想要定义读的一个完整数据包
if (in.readableBytes() < frameLengthInt) {
return null;
}
//(2):进入到跳过多少字节这一个步骤,首先如果initialBytesToStrip(跳过字节数)>frameLengthInt
// 则抛出异常,因为这样子相当于没有任何意义。
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
// 如果initialBytesToStrip合理,则跳过指定的字节数
in.skipBytes(initialBytesToStrip);
// extract frame
// 获取当前读指针
int readerIndex = in.readerIndex();
// 获取需要读取的真正字节数
int actualFrameLength = frameLengthInt - initialBytesToStrip;
// 表示从buf里readindex指针处开始,抽取出actualFrameLength字节
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
// 然后将读指针进行向下偏移
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
总结步骤:
标签:Netty,丢弃,buffer,编解码器,ctx,int,源码,ByteBuf,final 来源: https://blog.csdn.net/weixin_43340717/article/details/116569658