《Netty权威指南 第2版》学习笔记(3)---分隔符和定长解码器
作者:互联网
前言
前面介绍过上层应用协议为了对消息进行区分,解决策略一般采用如下4种方式。
1、消息定长,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据包。
2、在包尾增加回车换行符进行分割,例如FTP,这种方式在文本协议中应用比较广泛。
3、将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符。
4、通过在消息头中定义长度字段来标识消息的总长度。
Netty对上面应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。
本文继续学习另外两种解码器:DelimiterBasedFrameDecoder和FixedLengthFrameDecoder,前者可以自动完成以分隔符做结束标志的消息的解码,后者可以自动完成对定长消息的解码,它们都能解决TCP粘包、拆包导致的读半包问题。
DelimiterBasedFrameDecoder解码器
服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TimeServer {
public void bind(int port) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
使用DelimiterBasedFrameDecoder解码器,并指定按照"$_"作为分隔符,
1024表示单条消息的最大长度,当达到该长度后仍然没有查找到分隔符,就抛出TooLongFrameException异常,
防止由于异常码流缺失分隔符导致的内存溢出。
*/
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeServer().bind(8080);
}
}
TimeServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString()
: "BAD ORDER";
/*
每行结尾新增"$_"
*/
currentTime += "$_";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class TimeClient {
public void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
使用DelimiterBasedFrameDecoder解码器,并指定按照"$_"作为分隔符
*/
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
new TimeClient().connect(port, "127.0.0.1");
}
}
TimeClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private byte[] req;
public TimeClientHandler() {
/*
每条请求的数据后面都拼上一个"$_"
*/
req = ("QUERY TIME ORDER$_").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message;
for (int i = 0; i < 500; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
和之前使用LineBasedFrameDecoder解码器的代码几乎一样。
单条消息超过设定的最大长度
io.netty.handler.codec.TooLongFrameException: frame length exceeds 1: 16 - discarded
FixedLengthFrameDecoder解码器
FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码。
服务端
只需要替换FixedLengthFrameDecoder,这里固定长度为64个字节
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class TimeServer {
public void bind(int port) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
使用FixedLengthFrameDecoder解码器,固定长度为64个字节
*/
ch.pipeline().addLast(new FixedLengthFrameDecoder(64));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeServer().bind(8080);
}
}
TimeServerHandler
没有任何变化
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString()
: "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TimeClient
与服务端一样替换成FixedLengthFrameDecoder,固定长度64个字节
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class TimeClient {
public void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
使用FixedLengthFrameDecoder解码器,固定长度为64个字节
*/
ch.pipeline().addLast(new FixedLengthFrameDecoder(64));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
new TimeClient().connect(port, "127.0.0.1");
}
}
TimeClientHandler
构造一个固定64个字节长度的字节数组,进行发送。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private byte[] req;
private byte[] fixReq;
public TimeClientHandler() {
req = ("QUERY TIME ORDER").getBytes();
/*
使用System.arraycopy技巧,保证发送内容为64个字节
源数组如果不足64字节,则会以空字符表示
*/
fixReq = new byte[64];
System.arraycopy(req, 0, fixReq, 0, req.length);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message;
for (int i = 0; i < 500; i++) {
message = Unpooled.buffer(fixReq.length);
message.writeBytes(fixReq);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
/*
body.trim()去除头尾空格
*/
System.out.println("Now is : " + body.trim() + " ; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端打印
客户端打印
可以看出服务端打印没有问题,因为客户端已经按照固定长度发送的,所以FixedLengthFrameDecoder会处理粘包问题,但是客户端打印出现了粘包的问题,因为服务端在发送时并没有按照指定的字节长度进行发送。
TimeServerHandler
按照客户端方式修改服务端TimeServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
//去除收尾空格
body = body.trim();
System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString()
: "BAD ORDER";
/*
使用System.arraycopy技巧,保证发送内容为64个字节
源数组如果不足64字节,则会以空字符表示
*/
byte[] fixReq = new byte[64];
System.arraycopy(currentTime.getBytes(), 0, fixReq, 0, currentTime.getBytes().length);
ByteBuf resp = Unpooled.copiedBuffer(fixReq);
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端打印
客户端打印
总结
DelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码,FixedLengthFrameDecoder用于对固定长度的消息进行自动解码,有了这两种解码器,可以轻松地完成对很多消息的自动解码,而且不需要考虑TCP粘包、拆包导致的读半包问题,极大的提升了开发效率。
标签:netty,解码器,Netty,定长,io,new,import,public,channel 来源: https://blog.csdn.net/CSDN_WYL2016/article/details/114007035