其他分享
首页 > 其他分享> > Spring Cloud微服务使用webSocket的方法

Spring Cloud微服务使用webSocket的方法

作者:互联网

一、webSocket简介

webSocket长连接是一种在单个tcp连接上进行全双工通信的协议,允许双向数据推送。一般微服务提供的restful API只是对前端请求做出相应。使用webSocket可以实现后端主动向前端推送消息。

二、网关配置

spring cloud 的网关组件有zuul和getway

1、getway

配置网关的时候注意添加ws协议

spring:
  cloud:
    gateway:
      discovery:
        locator:
          lowerCaseServiceId: true
          enabled: true
      routes:
        - id: zhgsgl-warning-websocket # 路由的唯一标识
          uri: lb:ws://zhgsgl-warning  # 修改点
          predicates:
            - Path=/ws/warning/**
          filters:
            - StripPrefix=2 # 修改点
        - id: zhgsgl-data-websocket# 路由的唯一标识
          uri: lb:ws://zhgsgl-data # 修改点
          predicates:
            - Path=/ws/data/**
          filters:
            - StripPrefix=2 #修改点
# 安全配置
security:
  # 不校验白名单
  ignore:
    whites:
      - /ws/** # 修改点

2、zuul

zuul只能管理http请求,不推荐使用zuul管理websocket连接,推荐直连

三、服务端业务模块整合websocket

1.maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2.添加webSocket 配置

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;


@Configuration
@EnableWebSocket
public class WebsocketConfiguration implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // webSocket通道,指定处理器和路径
        // 直连当前业务模块的连接地址:ws://192.168.2.137:9403/websocket?user=张三
        // 通过网关模块的连接地址  ws://192.168.2.137:8080/ws/websocket?user=张三
        // 测试发送消息格式:{"content":"内容","targetId":"0"}
        registry
            .addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/websocket")
            // 指定自定义拦截器
            .addInterceptors(new WebSocketInterceptor())
            // 允许跨域
            .setAllowedOrigins("*");
        // sockjs通道
        registry
            .addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/sock-js")
            .addInterceptors(new WebSocketInterceptor())
            .setAllowedOrigins("*")
            // 开启sockJs支持
            .withSockJS();
    }
}

科普:websocket与sockJS的区别

实际上就是前端的区别,一些浏览器不支持websocket,则用sockJS库来处理ws连接,一般都是用sockJS

3.添加处理器

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {
    
    /**
     * 存储sessionId和webSocketSession
     * 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储
     * 在分布式系统中,要想别的办法实现webSocketSession共享
     */
    private static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
    
    private static Map<String, String> userMap = new ConcurrentHashMap<>();

    /**
     * webSocket连接创建后调用
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 获取参数
        String user = String.valueOf(session.getAttributes().get("user"));
        userMap.put(user, session.getId());
        sessionMap.put(session.getId(), session);
        log.info("###############  [ws : 连接成功]  ###############");
    }

    /**
     * 接收到消息会调用
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        log.info("###############  [ws消息:{}]  ###############", message.getPayload().toString());
        JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());
        String content = jsonObject.getString("content");
        String targetAdminId = jsonObject.getString("targetId");
        if ("0".equals(targetAdminId)) {
            // 推送给所有人
            userMap.forEach((key, value) -> {
                try {
                    this.sendMessage(key, content);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            sendMessage("1", content);
        }
        log.info("// ###############  [ws  处理消息成功]  ###############");
    }

    /**
     * 连接出错会调用
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        sessionMap.remove(session.getId());
    }

    /**
     * 连接关闭会调用
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessionMap.remove(session.getId());
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 后端发送消息
     */
    public void sendMessage(String user, String message) throws IOException {
        String sessionId = userMap.get(user);
        if (StringUtils.isEmpty(sessionId)) {
            return;
        }
        WebSocketSession session = sessionMap.get(sessionId);
        if (session == null) {
            return;
        }
        session.sendMessage(new TextMessage(message));
    }
}

4.添加拦截器

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.Map;

public class WebSocketInterceptor implements HandshakeInterceptor {
    
    /**
     * handler处理前调用,attributes属性最终在WebSocketSession里,
     * 可能通过webSocketSession.getAttributes().get(key值)获得
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Map<String, Object> attributes) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
            // 获取请求路径携带的参数
            String user = serverHttpRequest.getServletRequest().getParameter("user");
            attributes.put("user", user);
            return true;
        } else {
            return false;
        }
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Exception exception) {
    }
}

测试连接,亲测使用若依cloud框架是可以使用的,就不展示测试代码

四、参考文献

原文地址

微服务springcloud环境下基于Netty搭建websocket集群实现高并发,高性能,高可用的服务器消息推送--经典案例(已在工作中实战应用)netty是yyds!

IDEA 中 同一个微服务 按照多个端口启动

标签:webSocket,Spring,springframework,session,ws,import,org,public,Cloud
来源: https://www.cnblogs.com/hhddd-1024/p/16424406.html