Spring Cloud微服务使用webSocket的方法
作者:互联网
一、webSocket简介
webSocket长连接是一种在单个tcp连接上进行全双工通信的协议,允许双向数据推送。一般微服务提供的restful API只是对前端请求做出相应。使用webSocket可以实现后端主动向前端推送消息。
二、网关配置
spring cloud 的网关组件有zuul和getway
1、getway
配置网关的时候注意添加ws协议
spring:
cloud:
gateway:
discovery:
locator:
lowerCaseServiceId: true
enabled: true
routes:
- id: zhgsgl-warning-websocket # 路由的唯一标识
uri: lb:ws://zhgsgl-warning # 修改点
predicates:
- Path=/ws/warning/**
filters:
- StripPrefix=2 # 修改点
- id: zhgsgl-data-websocket# 路由的唯一标识
uri: lb:ws://zhgsgl-data # 修改点
predicates:
- Path=/ws/data/**
filters:
- StripPrefix=2 #修改点
# 安全配置
security:
# 不校验白名单
ignore:
whites:
- /ws/** # 修改点
2、zuul
zuul只能管理http请求,不推荐使用zuul管理websocket连接,推荐直连
三、服务端业务模块整合websocket
1.maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.添加webSocket 配置
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebsocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// webSocket通道,指定处理器和路径
// 直连当前业务模块的连接地址:ws://192.168.2.137:9403/websocket?user=张三
// 通过网关模块的连接地址 ws://192.168.2.137:8080/ws/websocket?user=张三
// 测试发送消息格式:{"content":"内容","targetId":"0"}
registry
.addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/websocket")
// 指定自定义拦截器
.addInterceptors(new WebSocketInterceptor())
// 允许跨域
.setAllowedOrigins("*");
// sockjs通道
registry
.addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/sock-js")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*")
// 开启sockJs支持
.withSockJS();
}
}
实际上就是前端的区别,一些浏览器不支持websocket,则用sockJS库来处理ws连接,一般都是用sockJS
3.添加处理器
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {
/**
* 存储sessionId和webSocketSession
* 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储
* 在分布式系统中,要想别的办法实现webSocketSession共享
*/
private static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
private static Map<String, String> userMap = new ConcurrentHashMap<>();
/**
* webSocket连接创建后调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 获取参数
String user = String.valueOf(session.getAttributes().get("user"));
userMap.put(user, session.getId());
sessionMap.put(session.getId(), session);
log.info("############### [ws : 连接成功] ###############");
}
/**
* 接收到消息会调用
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
log.info("############### [ws消息:{}] ###############", message.getPayload().toString());
JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());
String content = jsonObject.getString("content");
String targetAdminId = jsonObject.getString("targetId");
if ("0".equals(targetAdminId)) {
// 推送给所有人
userMap.forEach((key, value) -> {
try {
this.sendMessage(key, content);
} catch (IOException e) {
e.printStackTrace();
}
});
} else {
sendMessage("1", content);
}
log.info("// ############### [ws 处理消息成功] ###############");
}
/**
* 连接出错会调用
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
sessionMap.remove(session.getId());
}
/**
* 连接关闭会调用
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessionMap.remove(session.getId());
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 后端发送消息
*/
public void sendMessage(String user, String message) throws IOException {
String sessionId = userMap.get(user);
if (StringUtils.isEmpty(sessionId)) {
return;
}
WebSocketSession session = sessionMap.get(sessionId);
if (session == null) {
return;
}
session.sendMessage(new TextMessage(message));
}
}
4.添加拦截器
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
public class WebSocketInterceptor implements HandshakeInterceptor {
/**
* handler处理前调用,attributes属性最终在WebSocketSession里,
* 可能通过webSocketSession.getAttributes().get(key值)获得
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Map<String, Object> attributes) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
// 获取请求路径携带的参数
String user = serverHttpRequest.getServletRequest().getParameter("user");
attributes.put("user", user);
return true;
} else {
return false;
}
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Exception exception) {
}
}
测试连接,亲测使用若依cloud框架是可以使用的,就不展示测试代码
四、参考文献
微服务springcloud环境下基于Netty搭建websocket集群实现高并发,高性能,高可用的服务器消息推送--经典案例(已在工作中实战应用)netty是yyds!
标签:webSocket,Spring,springframework,session,ws,import,org,public,Cloud 来源: https://www.cnblogs.com/hhddd-1024/p/16424406.html