yudao-cloud WebSocket:实时消息推送与在线聊天功能
2026-02-04 04:36:09作者:何将鹤
引言:实时通信的迫切需求
在现代Web应用中,实时消息推送和在线聊天功能已成为提升用户体验的关键要素。无论是系统通知、订单状态更新,还是客服聊天、团队协作,都需要WebSocket技术来实现真正的双向实时通信。yudao-cloud基于Spring Boot提供了强大的WebSocket解决方案,让开发者能够轻松构建高性能的实时应用。
WebSocket核心架构设计
yudao-cloud的WebSocket模块采用分层架构设计,确保系统的可扩展性和维护性:
classDiagram
class WebSocketMessageSender {
+send(Integer userType, Long userId, String messageType, String messageContent)
+send(Integer userType, String messageType, String messageContent)
+send(String sessionId, String messageType, String messageContent)
}
class WebSocketMessageListener~T~ {
+onMessage(WebSocketSession session, T message)
+getType() String
}
class WebSocketSessionManager {
+addSession(WebSocketSession session)
+removeSession(String sessionId)
+getSession(String sessionId) WebSocketSession
}
class LoginUserHandshakeInterceptor {
+beforeHandshake(...) boolean
}
WebSocketMessageSender <|.. LocalWebSocketMessageSender
WebSocketMessageSender <|.. RedisWebSocketMessageSender
WebSocketMessageSender <|.. KafkaWebSocketMessageSender
WebSocketMessageListener <|.. DemoWebSocketMessageListener
核心组件功能说明
| 组件名称 | 职责描述 | 关键技术 |
|---|---|---|
| WebSocketMessageSender | 消息发送器接口,支持多种消息发送模式 | 策略模式、JSON序列化 |
| WebSocketMessageListener | 消息监听器,处理接收到的WebSocket消息 | 泛型、事件驱动 |
| WebSocketSessionManager | 会话管理,维护在线用户连接 | ConcurrentHashMap、Session管理 |
| LoginUserHandshakeInterceptor | 握手拦截器,实现用户认证 | Spring Security集成 |
快速开始:构建实时聊天功能
1. 环境配置
首先在application.yml中启用WebSocket功能:
yudao:
websocket:
enable: true
server:
port: 9321 # WebSocket服务器端口
2. 定义消息模型
创建发送和接收的消息DTO:
// 发送消息DTO
@Data
@Schema(description = "WebSocket 示例发送消息")
public class DemoSendMessage {
@Schema(description = "接收用户编号", example = "1024")
private Long toUserId;
@Schema(description = "消息内容", example = "你好,这是一条测试消息")
private String text;
}
// 接收消息DTO
@Data
@Schema(description = "WebSocket 示例接收消息")
public class DemoReceiveMessage {
@Schema(description = "发送用户编号", example = "1")
private Long fromUserId;
@Schema(description = "消息内容", example = "你好,这是一条测试消息")
private String text;
@Schema(description = "是否单发", example = "true")
private Boolean single;
}
3. 实现消息监听器
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {
@Autowired
private WebSocketMessageSender webSocketMessageSender;
@Override
public void onMessage(WebSocketSession session, DemoSendMessage message) {
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
// 单发消息处理
if (message.getToUserId() != null) {
DemoReceiveMessage toMessage = new DemoReceiveMessage()
.setFromUserId(fromUserId)
.setText(message.getText())
.setSingle(true);
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
message.getToUserId(),
"demo-message-receive",
toMessage
);
return;
}
// 群发消息处理
DemoReceiveMessage toMessage = new DemoReceiveMessage()
.setFromUserId(fromUserId)
.setText(message.getText())
.setSingle(false);
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
"demo-message-receive",
toMessage
);
}
@Override
public String getType() {
return "demo-message-send";
}
}
4. 前端WebSocket连接
// 建立WebSocket连接
const socket = new WebSocket('ws://localhost:9321/websocket');
// 连接建立事件
socket.onopen = function(event) {
console.log('WebSocket连接已建立');
// 发送认证消息(携带token)
const authMessage = {
type: 'auth',
token: '用户登录token'
};
socket.send(JSON.stringify(authMessage));
};
// 接收消息事件
socket.onmessage = function(event) {
const message = JSON.parse(event.data);
switch(message.type) {
case 'demo-message-receive':
handleChatMessage(message);
break;
case 'system-notification':
handleSystemNotification(message);
break;
default:
console.log('收到未知类型消息:', message);
}
};
// 发送聊天消息
function sendChatMessage(toUserId, content) {
const message = {
type: 'demo-message-send',
toUserId: toUserId,
text: content
};
socket.send(JSON.stringify(message));
}
高级特性:消息分发模式
yudao-cloud支持多种消息分发模式,满足不同业务场景需求:
1. 单播消息(Unicast)
// 发送给指定用户
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
1024L, // 目标用户ID
"private-message",
messageContent
);
2. 广播消息(Broadcast)
// 发送给所有管理员用户
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
"system-notification",
notificationContent
);
3. 会话级消息(Session-level)
// 发送给特定会话
webSocketMessageSender.sendObject(
"session-id-123",
"session-message",
sessionSpecificContent
);
消息可靠性保障
心跳检测机制
sequenceDiagram
participant Client
participant Server
participant SessionManager
Client->>Server: WebSocket连接
Server->>SessionManager: 注册会话
Note over Client,Server: 连接建立成功
loop 每30秒一次心跳
Client->>Server: Ping消息
Server->>Client: Pong响应
end
Note over Client,Server: 网络异常断开
Server->>SessionManager: 检测到连接断开
SessionManager->>SessionManager: 清理会话资源
重连机制实现
class WebSocketManager {
constructor() {
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
this.socket = new WebSocket('ws://localhost:9321/websocket');
this.socket.onclose = (event) => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, 3000 * this.reconnectAttempts);
}
};
this.socket.onopen = (event) => {
this.reconnectAttempts = 0;
this.authenticate();
};
}
}
性能优化策略
1. 连接池管理
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketSessionManager webSocketSessionManager() {
return new WebSocketSessionManagerImpl(10000); // 最大连接数限制
}
}
2. 消息压缩
public void sendCompressedMessage(Integer userType, Long userId, String messageType, Object message) {
String json = JsonUtils.toJsonString(message);
byte[] compressed = compress(json.getBytes(StandardCharsets.UTF_8));
// 发送压缩后的消息
}
3. 批量消息处理
@Scheduled(fixedRate = 1000) // 每秒批量处理一次
public void processBatchMessages() {
List<WebSocketMessage> batch = messageQueue.drainToBatch();
if (!batch.isEmpty()) {
sendBatchMessages(batch);
}
}
安全防护措施
1. 认证授权
@Component
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 从请求中提取token并进行验证
String token = extractToken(request);
if (!authService.validateToken(token)) {
return false; // 认证失败,拒绝连接
}
// 设置用户信息到attributes中
LoginUser loginUser = authService.getLoginUser(token);
attributes.put("loginUser", loginUser);
return true;
}
}
2. 消息内容过滤
public class MessageContentFilter {
private static final Pattern XSS_PATTERN = Pattern.compile("[<>\"']");
public static String filter(String content) {
if (content == null) return null;
return XSS_PATTERN.matcher(content).replaceAll("");
}
}
实战案例:系统通知功能
后端服务实现
@Service
public class NotificationService {
@Autowired
private WebSocketMessageSender webSocketMessageSender;
public void sendSystemNotification(String title, String content, UserTypeEnum userType) {
SystemNotification notification = new SystemNotification()
.setTitle(title)
.setContent(content)
.setCreateTime(new Date());
webSocketMessageSender.sendObject(
userType.getValue(),
"system-notification",
notification
);
}
public void sendPersonalNotification(Long userId, String title, String content) {
PersonalNotification notification = new PersonalNotification()
.setTitle(title)
.setContent(content)
.setUserId(userId)
.setCreateTime(new Date());
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
userId,
"personal-notification",
notification
);
}
}
前端消息处理
// 系统通知处理
function handleSystemNotification(notification) {
const notificationElement = `
<div class="notification">
<h4>${escapeHtml(notification.title)}</h4>
<p>${escapeHtml(notification.content)}</p>
<span class="time">${formatTime(notification.createTime)}</span>
</div>
`;
// 添加到通知列表
$('#notification-list').prepend(notificationElement);
// 显示桌面通知(如果浏览器支持)
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.content,
icon: '/static/images/logo.png'
});
}
}
监控与调试
1. 连接状态监控
@RestController
@RequestMapping("/admin/websocket")
public class WebSocketMonitorController {
@Autowired
private WebSocketSessionManager sessionManager;
@GetMapping("/stats")
public CommonResult<WebSocketStatsVO> getStats() {
WebSocketStatsVO stats = new WebSocketStatsVO();
stats.setActiveConnections(sessionManager.getSessionCount());
stats.setTotalMessagesSent(messageCounter.getTotalSent());
stats.setTotalMessagesReceived(messageCounter.getTotalReceived());
return CommonResult.success(stats);
}
}
2. 日志调试
在application.yml中配置WebSocket调试日志:
logging:
level:
cn.iocoder.yudao.framework.websocket: DEBUG
org.springframework.web.socket: DEBUG
常见问题解决方案
1. 连接数限制问题
问题描述:WebSocket连接数达到上限 解决方案:
yudao:
websocket:
server:
max-sessions: 50000 # 调整最大连接数
2. 内存泄漏问题
问题描述:Session未正确清理导致内存泄漏 解决方案:
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
sessionManager.removeSession(event.getSessionId());
// 清理相关资源
}
3. 跨域问题
解决方案:
@Bean
public WebSocketHandlerRegistrationCustomizer webSocketHandlerRegistrationCustomizer() {
return registration -> registration.setAllowedOrigins("*");
}
总结
yudao-cloud的WebSocket模块提供了完整的企业级实时通信解决方案,具备以下核心优势:
- 架构清晰:采用分层设计,各组件职责明确
- 功能丰富:支持单播、广播、会话级多种消息模式
- 性能优异:内置连接池、消息压缩、批量处理等优化策略
- 安全可靠:完善的认证授权和内容过滤机制
- 易于扩展:支持多种消息中间件集成
通过本文的详细讲解和代码示例,开发者可以快速掌握yudao-cloud WebSocket的使用方法,构建出高性能、高可用的实时消息推送和在线聊天功能。无论是系统通知、实时监控,还是在线客服、协同编辑,yudao-cloud都能提供强有力的技术支撑。
在实际项目中,建议根据具体业务需求选择合适的消息分发模式,并结合监控系统对WebSocket连接状态进行实时监控,确保系统的稳定运行。
登录后查看全文
热门项目推荐
相关项目推荐
Kimi-K2.5Kimi K2.5 是一款开源的原生多模态智能体模型,它在 Kimi-K2-Base 的基础上,通过对约 15 万亿混合视觉和文本 tokens 进行持续预训练构建而成。该模型将视觉与语言理解、高级智能体能力、即时模式与思考模式,以及对话式与智能体范式无缝融合。Python00
GLM-4.7-FlashGLM-4.7-Flash 是一款 30B-A3B MoE 模型。作为 30B 级别中的佼佼者,GLM-4.7-Flash 为追求性能与效率平衡的轻量化部署提供了全新选择。Jinja00
VLOOKVLOOK™ 是优雅好用的 Typora/Markdown 主题包和增强插件。 VLOOK™ is an elegant and practical THEME PACKAGE × ENHANCEMENT PLUGIN for Typora/Markdown.Less00
PaddleOCR-VL-1.5PaddleOCR-VL-1.5 是 PaddleOCR-VL 的新一代进阶模型,在 OmniDocBench v1.5 上实现了 94.5% 的全新 state-of-the-art 准确率。 为了严格评估模型在真实物理畸变下的鲁棒性——包括扫描伪影、倾斜、扭曲、屏幕拍摄和光照变化——我们提出了 Real5-OmniDocBench 基准测试集。实验结果表明,该增强模型在新构建的基准测试集上达到了 SOTA 性能。此外,我们通过整合印章识别和文本检测识别(text spotting)任务扩展了模型的能力,同时保持 0.9B 的超紧凑 VLM 规模,具备高效率特性。Python00
KuiklyUI基于KMP技术的高性能、全平台开发框架,具备统一代码库、极致易用性和动态灵活性。 Provide a high-performance, full-platform development framework with unified codebase, ultimate ease of use, and dynamic flexibility. 注意:本仓库为Github仓库镜像,PR或Issue请移步至Github发起,感谢支持!Kotlin07
compass-metrics-modelMetrics model project for the OSS CompassPython00
项目优选
收起
deepin linux kernel
C
27
11
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
525
3.72 K
Ascend Extension for PyTorch
Python
329
391
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
877
578
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
335
162
暂无简介
Dart
764
189
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
12
1
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
1.33 K
746
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
67
20
React Native鸿蒙化仓库
JavaScript
302
350