从零开始学Netty (四)-- 实战实现前后端分离的IM框架
作者:互联网
网上有很多Netty的教程DEMO,但是前后端分离的例子可能比较少。这次我将会使用SpringBoot + Netty + WebSocket的技术栈,实现前后端分离的即时通讯(IM)的DEMO。
主要功能
服务端接收的消息主要分以下几类:
1、CONNECT:当websocket 第一次open的时候,初始化channel,把用的channel 和 userid 关联起来。
2、CHAT:聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]。如果接收者在线,则转发给相应的接收者。
3、SIGNED:签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收]。
4、KEEPALIVE:心跳类型的消息,如果心跳超时未连接,则服务端断开连接。
消息实体类
public class DataContent implements Serializable {
private Integer action;
private String userId;
private String receiverId;
private String msg;
...
}
定义消息类型action,发送用户userId,接收用户receiverId,消息实体msg。
action类型定义:
public static final int CONNECT = 1;
public static final int CHAT = 2;
public static final int SIGNED = 3;
public static final int KEEPALIVE = 4;
connect:请求类型;
chat:聊天发送;
signed:聊天接收;
keepalive:保持心跳连接;
ChatHandler处理核心业务
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//用于记录和管理所有客户端的channel
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//获取客户端所传输的消息
String content = msg.text();
System.out.println(content);
//1.获取客户端发来的消息
DataContent dataContent = JSON.parseObject(content, DataContent.class);
Integer action = dataContent.getAction();
Channel channel = ctx.channel();
//2.判断消息类型,根据不同的类型来处理不同的业务
if(action == MessageActionConstant.CONNECT){
//2.1 当websocket 第一次open的时候,初始化channel,把用的channel 和 userid 关联起来
UserChanelRel.put(dataContent.getUserId(), channel);
} else if(action == MessageActionConstant.CHAT){
//2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]
// 发送消息
Channel receiverChannel = UserChanelRel.get(dataContent.getReceiverId());
if(receiverChannel ==null){
//离线用户
}else{
//当receiverChannel 不为空的时候,从ChannelGroup 去查找对应的channel 是否存在
Channel findChanel = users.find(receiverChannel.id());
if(findChanel!=null){
// 用户在线
receiverChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dataContent)));
}else{
//离线用户
}
}
} else if(action == MessageActionConstant.SIGNED){
// //2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收]
} else if(action == MessageActionConstant.KEEPALIVE){
//2.4 心跳类型的消息
System.out.println("收到来自channel 为【"+channel+"】的心跳包");
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
users.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String chanelId = ctx.channel().id().asShortText();
System.out.println("客户端被移除:channel id 为:"+chanelId);
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//发生了异常后关闭连接,同时从channelgroup移除
ctx.channel().close();
users.remove(ctx.channel());
}
}
这里根据不同的action,进行业务处理。聊天信息的持久化没做,有需要的可以自行补足。
初始化ChannelPipeline的处理器
public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(1024*64));
pipeline.addLast(new IdleStateHandler(8,10,12));
// Check Heart Beat
pipeline.addLast(new HeartBeatHandler());
// Web Socket Server
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// Chat Service Handler
pipeline.addLast(new ChatHandler());
}
}
HttpServerCodec:websocket 基于http协议,所需要的http编解码器。
ChunkedWriteHandler:对下数据流写提供支持。
HttpObjectAggregator:对httpMessage进行聚合处理,聚合成request或response。
IdleStateHandler:针对客户端,如果在1分钟时间内没有向服务端发送读写心跳(ALL),则主动断开连接。如果有读空闲和写空闲,则不做任何处理。
HeartBeatHandler:自定义的空闲状态检测的handler。
WebSocketServerProtocolHandler:处理与WebSocket协议相关的动作,包括握手、心跳等。
ChatHandler:处理核心业务。
启动Netty服务器
@Component
public class WebSocketServer {
@Value("${chat.server.host}")
private String host;
@Value("${chat.server.port}")
private int port;
private static class SingletionWSServer {
static final WebSocketServer instance = new WebSocketServer();
}
@Bean
public static WebSocketServer getInstance() {
return SingletionWSServer.instance;
}
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture future;
public WebSocketServer() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WSServerInitialzer());
}
public void start() {
this.future = server.bind(host,port);
if (future.isSuccess()) {
System.out.println("Web Socket Server Started!");
}
}
}
与SpringBoot集成启动
@Component
public class NettyServerBooter implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
WebSocketServer.getInstance().start();
}
}
github源码地址:https://github.com/lwtxzwt/easy-chat
欢迎点赞!
标签:Netty,--,ctx,签收,IM,private,new,public,channel 来源: https://blog.csdn.net/zwt122755527/article/details/110049431