Netty由浅入深的学习指南(进阶粘包半包)
作者:互联网
本章节主要介绍粘包半包的解决方法、协议的设计、序列化知识;同时通过实现聊天室案例将这些知识点串联起来。
3.1 粘包半包
-
粘包半包现象
- 粘包:多条数据粘连,一次发送给服务器
- 半包:一条完整消息从某个点断开,发送给服务器的消息不完整
演示代码
//服务器演示代码 public void start(){ //声明工作线程及主线程 NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { //启动配置 ServerBootstrap server = new ServerBootstrap(); server.channel(NioServerSocketChannel.class); //非阻塞 //调整系统的接收缓冲区 server.option(ChannelOption.SO_RCVBUF,10); //设置接收缓冲区 server.group(boss,worker); server.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture future = server.bind(8080).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("服务器启动错误:{}",e.getMessage()); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } //客户端演示代码 static void send(){ NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap(); client.channel(NioSocketChannel.class); client.group(worker); client.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { ByteBuf buf = ctx.alloc().buffer(16); buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15}); ctx.writeAndFlush(buf); } } }); } }); ChannelFuture future = client.connect("127.0.0.1", 8080).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ log.error("客户端错误:{}",e.getMessage()); }finally { worker.shutdownGracefully(); } }
日志说明
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
±-------±------------------------------------------------±---------------+
|00000000| 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 04 05 |…|
|00000010| 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 04 05 |…|
|00000020| 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 04 05 |…|
|00000030| 06 07 |… |
±-------±------------------------------------------------±---------------+
通过日志可以看出,服务器前三次接收的属于粘包现象,最后一次属于半包现象。
-
粘包半包的解决方案
-
短链接
//客户端 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = ctx.alloc().buffer(16); buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15}); ctx.writeAndFlush(buf); ctx.channel().close(); //发完消息接断开连接,可以解决粘包,不能处理半包 } //服务器,模拟半包现象 //调整系统的接收缓冲区(滑动窗口) //server.option(ChannelOption.SO_RCVBUF,10); //设置接收缓冲区 //调整netty的接收缓冲区(ByteBuf) server.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));
-
定长解码器
//服务器端 protected void initChannel(SocketChannel sc) throws Exception { //添加定长解吗器,位于最上端 sc.pipeline().addLast(new FixedLengthFrameDecoder(10)); sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } //客户端,fill10Bytes不足的补全10个字节 public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = ctx.alloc().buffer(); char c = '0'; Random r = new Random(); for (int i = 0; i < 10; i++) { byte[] bytes = fill10Bytes(c,r.nextInt(10) + 1); c++; buf.writeBytes(bytes); } ctx.writeAndFlush(buf); } //fill10Bytes()方法代码 public static byte[] fill10Bytes(char c ,int len){ byte[] bytes = new byte[10]; Arrays.fill(bytes,(byte) '_'); for (int i = 0; i < len; i++) { bytes[i] = (byte)c; } System.out.println("内容:" + new String(bytes)); return bytes; }
-
行解码器
//服务器代码 protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new LineBasedFrameDecoder(1024)); //添加行解吗器 sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } //客户端代码,makeString给消息添加分隔符‘\n’ public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = ctx.alloc().buffer(); char c = '0'; Random r = new Random(); for (int i = 0; i < 10; i++) { byte[] bytes = makeString(c,r.nextInt(256) + 1); c++; buf.writeBytes(bytes); } ctx.writeAndFlush(buf); } //makeString()方法代码 public static byte[] makeString(char c ,int len){ StringBuilder sb = new StringBuilder(len + 2); for (int i = 0; i < len; i++) { sb.append(c); } sb.append("\n"); return sb.toString().getBytes(); }
-
LTC解码器
public static void main(String[] args) { EmbeddedChannel channel = new EmbeddedChannel( new LengthFieldBasedFrameDecoder(1024,0,4,1,4), new LoggingHandler(LogLevel.DEBUG) ); //4个字节的内容长度,实际长度 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); send(buf,"hello, world"); send(buf,"Hi"); channel.writeInbound(buf); } private static void send(ByteBuf buf,String content){ byte[] bytes = content.getBytes(); //实际内容 int len = bytes.length; //实际内容长度 buf.writeInt(len); //写入内容长度 buf.writeByte(1); //写入任意数据 buf.writeBytes(bytes); //写入实际内容 }
注解:
- maxFrameLength:发送的数据帧最大长度
- lengthFieldOffset:定义长度域位于发送的字节数组中的下标
- lengthFieldLength:用于描述定义的长度域的长度
- lengthAdjustment:自长度域以后几个字节为内容域
- initialBytesToStrip:接收到的发送数据包,去除前initialBytesToStrip位
-
3.2 协议的制定及解析
-
Redis
final byte[] LINE = {13,10}; NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap(); client.channel(NioSocketChannel.class); client.group(worker); client.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //set name zhangsan ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); //命令数组长度 buf.writeBytes(LINE); //回车换行 buf.writeBytes("$3".getBytes()); //命令字段长度 set buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); //指令 => set buf.writeBytes(LINE); buf.writeBytes("$4".getBytes()); //key字段长度 name buf.writeBytes(LINE); buf.writeBytes("name".getBytes()); //key => name buf.writeBytes(LINE); buf.writeBytes("$8".getBytes()); //value字段长度 zhangsan buf.writeBytes(LINE); buf.writeBytes("zhangsan".getBytes()); //value => zhangsan buf.writeBytes(LINE); ctx.writeAndFlush(buf); //写入并发送 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); } }); } }); ChannelFuture future = client.connect("127.0.0.1", 6379).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ log.error("客户端错误:{}",e.getMessage()); }finally { worker.shutdownGracefully(); }
-
Http
NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap client = new ServerBootstrap(); client.channel(NioServerSocketChannel.class); client.group(boss,worker); client.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); sc.pipeline().addLast(new HttpServerCodec()); sc.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { //获取请求 log.info(msg.uri()); //返回响应 DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); byte[] bytes = "<h1>Hello, World!</h1>".getBytes(); response.headers().setInt(CONTENT_LENGTH,bytes.length); response.content().writeBytes(bytes); //写回响应 ctx.writeAndFlush(response); } }); } }); ChannelFuture future = client.bind("127.0.0.1", 8080).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ log.error("客户端错误:{}",e.getMessage()); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }
-
自定义协议
-
要素
- 魔数:用来在第一时间判定是否是无效的数据包
- 版本号:可以支持协议的升级
- 序列化算法:消息正文到底采用哪种序列化方式,可以由此扩展,例如JSON\protobuf\jdk
- 指令类型:是登录、注册…与业务相关
- 请求序号:为了双工通信,提供异步能力
- 正文长度:传递数据的长度
- 消息正文:传递的数据
-
编码
@Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { //4字节的魔数 out.writeBytes(new byte[]{1,2,3,4}); //1字节的版本 out.writeByte(1); //1字节的序列化方式0:jdk;1:json out.writeByte(0); //1字节的指令类型 out.writeByte(msg.getMessageType()); //4字节序列号 out.writeByte(msg.getSequenceId()); //对齐填充 out.writeByte(0xff); //获取字节的内容数组 ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); //内容长度 out.writeInt(bytes.length); //内容写入 out.writeBytes(bytes); }
-
解码
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerType = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes,0,length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois.readObject(); log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}", message); out.add(message); }
-
测试
EmbeddedChannel channel = new EmbeddedChannel( //解决粘包半包问题 new LengthFieldBasedFrameDecoder(1024,12,4,0,0), new LoggingHandler(LogLevel.DEBUG), new MessageCodec() ); LoginRequestMessage request = new LoginRequestMessage("zhangsan","123","张三"); //出站 channel.writeOutbound(request); ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null,request,buf); //模拟半包现象 ByteBuf s1 = buf.slice(0, 100); ByteBuf s2 = buf.slice(100,buf.readableBytes() - 100); //入站 s1.retain(); //防止内存释放 channel.writeInbound(s1); channel.writeInbound(s2);
-
标签:Netty,writeBytes,进阶,bytes,ctx,new,半包,byte,buf 来源: https://blog.csdn.net/fuu123f/article/details/116737771