Netty字符串类型消息粘包/半包问题解决方案三示例代码:FixedLengthFrameDecoder+StringDecoder
作者:互联网
- 原理
按照开发者指定的固定长度对消息进行解码
- 案例
1.FixedLengthServer
/**
* @author pdc
*/
public class FixedLengthServer {
public static void main(String[] args) throws Exception {
int port = 8080;
new FixedLengthServer().bind(port);
}
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端辅助启动类ServerBootstrap对象
ServerBootstrap b = new ServerBootstrap();
//设置NIO线程组
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//设置TCP参数,连接请求的最大队列长度
.option(ChannelOption.SO_BACKLOG, 1024)
//设置I/O事件处理类,用来处理消息的编解码以及我们的业务逻辑
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//设置FixedLengthFrameDecoder处理器,13为所需要传输的字符串,"你好,Netty!"的字节数组长度
ch.pipeline().addLast(new FixedLengthFrameDecoder(13));
//设置StringDecoder处理器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new FixedLengthServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.FixedLengthServerHandler
/**
* @author pdc
*/
public class FixedLengthServerHandler extends SimpleChannelInboundHandler {
//计数器
private static final AtomicInteger counter = new AtomicInteger(0);
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收客户端发送的字符串,并打印到控制台
String content = (String) msg;
System.out.println("received from client:" + content + " counter:" + counter.addAndGet(1));
//将数据重新发送到客户端
ByteBuf echo = Unpooled.copiedBuffer(content.getBytes());
ctx.writeAndFlush(echo);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
3.FixedLengthClient
/**
* @author pdc
*/
public class FixedLengthClient {
public static void main(String[] args) throws Exception {
int port = 8080;
new FixedLengthClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception {
//创建客户端处理I/O读写的NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端辅助启动类
Bootstrap b = new Bootstrap();
//设置NIO线程组
b.group(group)
//设置NioSocketChannel,对应与JDK NIO类SocketChannel类
.channel(NioSocketChannel.class)
//设置TCP参数TCP_NODELAY
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//设置FixedLengthFrameDecoder处理器,13为所需要传输的字符串,"你好,Netty!"的字节数组长度
ch.pipeline().addLast(new FixedLengthFrameDecoder(13));
//设置StringDecoder处理器
ch.pipeline().addLast(new StringDecoder());
//配置客户端处理网络I/O事件的类
ch.pipeline().addLast(new FixedLengthClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
//循环发送1000次
for (int i = 0; i < 1000; i++) {
//构造客户端发送的数据ByteBuf对象
String content = "你好,Netty!";
byte[] req = content.getBytes();
ByteBuf messageBuffer = Unpooled.buffer(req.length);
messageBuffer.writeBytes(req);
//向服务端发送数据
ChannelFuture channelFuture = f.channel().writeAndFlush(messageBuffer);
channelFuture.syncUninterruptibly();
}
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
4.FixedLengthClientHandler
/**
* @author pdc
*/
public class FixedLengthClientHandler extends SimpleChannelInboundHandler {
private static final Logger logger = Logger.getLogger(FixedLengthClientHandler.class.getName());
//计数器
private static final AtomicInteger counter = new AtomicInteger(0);
//服务端响应请求返回数据的时候会自动调用该方法,我们通过实现该方法来接收服务端返回的数据,并实现客户端调用的业务逻辑
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取服务端返回的数据,并打印到控制台
String content = (String) msg;
System.out.println("received from server:" + content + " counter:" + counter.addAndGet(1));
}
//发生异常,关闭链路
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warning("Unexpected exception from downstream :" + cause.getMessage());
ctx.close();
}
}
- 运行
先运行FixedLengthServer的main,再运行FixedLengthClient的main即可
标签:Netty,Exception,示例,void,ctx,throws,new,半包,public 来源: https://blog.csdn.net/qq_41594698/article/details/94601127