本人对于netty框架的一些理解,怎么与网站上的websock建立连接
作者:互联网
在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。
private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(nettyPort)) //保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new HeartbeatInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("启动 Netty 成功"); }
上诉代码初始化了一条netty的服务,那么如何初始话的写在类HeartbeatInitializer里面
public class HeartbeatInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 拦截器中 .addLast(new IdleStateHandler(5, 0, 0)) // HttpServerCodec:将请求和应答消息解码为HTTP消息 .addLast("http-codec",new HttpServerCodec()) // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息 .addLast("aggregator",new HttpObjectAggregator(65536)) // ChunkedWriteHandler:向客户端发送HTML5文件 .addLast("http-chunked",new ChunkedWriteHandler()) .addLast(new HeartBeatSimpleHandle()); } }
上述文件设置了 拦截器,解码和解码合并,还有响应,最后一个new HeartBeatSimpleHandle()用来处理请求
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handShaker; /** * 取消绑定 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { NettySocketHolder.remove((NioSocketChannel) ctx.channel()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // 传统的HTTP接入 if (msg instanceof FullHttpRequest) { FullHttpRequest req = (FullHttpRequest) msg; handleHttpRequest(ctx, req); //获取url后置参数 String uri=req.uri(); QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri); Map<String, List<String>> parameters = queryStringDecoder.parameters(); Integer userId = Integer.valueOf(parameters.get("userId").get(0)); // 存储当前登录ctx if(NettySocketHolder.get((long)userId) == null){ NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel()); } // WebSocket接入 } else if (msg instanceof WebSocketFrame) { if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) { handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } log.info("收到msg={},id={}", msg); //保存客户端与 Channel 之间的关系 } } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { // 如果HTTP解码失败,返回HTTP异常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //获取url后置参数 HttpMethod method=req.method(); String uri=req.uri(); QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri); Map<String, List<String>> parameters = queryStringDecoder.parameters(); if(method==HttpMethod.GET&&"/websocket".equals(uri)){ //...处理 ctx.channel().attr(AttributeKey.valueOf("type")).set("live"); } // 构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false); handShaker = wsFactory.newHandshaker(req); if (handShaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handShaker.handshake(ctx.channel(), req); } } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); } }
HeartBeatSimpleHandle 用与处理管道(Channel)的读取工作,将管道储存在NettySocketHolder里面,用的时候取出。WebSocketServerHandshaker用于响应客户端的响应
var websocket; $(function(){ $.ajax({ type: "POST", url: "/login", data: { userId: 2, account: "135541", userName: "123" }, contentType: "application/x-www-form-urlencoded; charset=utf-8", dataType: "json", success: function (result) { websoketCannel(result.data.userId, result.data.account, result.data.userName); } }); }); function websoketCannel(userId, account, userName){ //如果浏览器支持WebSocket if(window.WebSocket){ websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +""); //获得WebSocket对象 //当有消息过来的时候触发 websocket.onmessage = function(event){ var data = JSON.parse(event.data); $("#message").text(data.msg); }; //连接关闭的时候触发 websocket.onclose = function(event){ console.log("断开连接"); }; //连接打开的时候触发 websocket.onopen = function(event){ console.log("建立连接"); } }else{ alert("浏览器不支持WebSocket"); } } function sendMsg(msg) { //发送消息 if(window.WebSocket){ if(websocket.readyState == WebSocket.OPEN) { //如果WebSocket是打开状态 websocket.send(msg); //send()发送消息 } }else{ return; } }
上面代码是客户端如何连netty
标签:netty,websocket,框架,userId,req,ctx,websock,msg,new 来源: https://www.cnblogs.com/kangniuniu/p/11107768.html