yudao-cloud WebSocket:实时消息推送与在线聊天功能
2026-02-04 04:36:09作者:何将鹤
引言:实时通信的迫切需求
在现代Web应用中,实时消息推送和在线聊天功能已成为提升用户体验的关键要素。无论是系统通知、订单状态更新,还是客服聊天、团队协作,都需要WebSocket技术来实现真正的双向实时通信。yudao-cloud基于Spring Boot提供了强大的WebSocket解决方案,让开发者能够轻松构建高性能的实时应用。
WebSocket核心架构设计
yudao-cloud的WebSocket模块采用分层架构设计,确保系统的可扩展性和维护性:
classDiagram
class WebSocketMessageSender {
+send(Integer userType, Long userId, String messageType, String messageContent)
+send(Integer userType, String messageType, String messageContent)
+send(String sessionId, String messageType, String messageContent)
}
class WebSocketMessageListener~T~ {
+onMessage(WebSocketSession session, T message)
+getType() String
}
class WebSocketSessionManager {
+addSession(WebSocketSession session)
+removeSession(String sessionId)
+getSession(String sessionId) WebSocketSession
}
class LoginUserHandshakeInterceptor {
+beforeHandshake(...) boolean
}
WebSocketMessageSender <|.. LocalWebSocketMessageSender
WebSocketMessageSender <|.. RedisWebSocketMessageSender
WebSocketMessageSender <|.. KafkaWebSocketMessageSender
WebSocketMessageListener <|.. DemoWebSocketMessageListener
核心组件功能说明
| 组件名称 | 职责描述 | 关键技术 |
|---|---|---|
| WebSocketMessageSender | 消息发送器接口,支持多种消息发送模式 | 策略模式、JSON序列化 |
| WebSocketMessageListener | 消息监听器,处理接收到的WebSocket消息 | 泛型、事件驱动 |
| WebSocketSessionManager | 会话管理,维护在线用户连接 | ConcurrentHashMap、Session管理 |
| LoginUserHandshakeInterceptor | 握手拦截器,实现用户认证 | Spring Security集成 |
快速开始:构建实时聊天功能
1. 环境配置
首先在application.yml中启用WebSocket功能:
yudao:
websocket:
enable: true
server:
port: 9321 # WebSocket服务器端口
2. 定义消息模型
创建发送和接收的消息DTO:
// 发送消息DTO
@Data
@Schema(description = "WebSocket 示例发送消息")
public class DemoSendMessage {
@Schema(description = "接收用户编号", example = "1024")
private Long toUserId;
@Schema(description = "消息内容", example = "你好,这是一条测试消息")
private String text;
}
// 接收消息DTO
@Data
@Schema(description = "WebSocket 示例接收消息")
public class DemoReceiveMessage {
@Schema(description = "发送用户编号", example = "1")
private Long fromUserId;
@Schema(description = "消息内容", example = "你好,这是一条测试消息")
private String text;
@Schema(description = "是否单发", example = "true")
private Boolean single;
}
3. 实现消息监听器
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {
@Autowired
private WebSocketMessageSender webSocketMessageSender;
@Override
public void onMessage(WebSocketSession session, DemoSendMessage message) {
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
// 单发消息处理
if (message.getToUserId() != null) {
DemoReceiveMessage toMessage = new DemoReceiveMessage()
.setFromUserId(fromUserId)
.setText(message.getText())
.setSingle(true);
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
message.getToUserId(),
"demo-message-receive",
toMessage
);
return;
}
// 群发消息处理
DemoReceiveMessage toMessage = new DemoReceiveMessage()
.setFromUserId(fromUserId)
.setText(message.getText())
.setSingle(false);
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
"demo-message-receive",
toMessage
);
}
@Override
public String getType() {
return "demo-message-send";
}
}
4. 前端WebSocket连接
// 建立WebSocket连接
const socket = new WebSocket('ws://localhost:9321/websocket');
// 连接建立事件
socket.onopen = function(event) {
console.log('WebSocket连接已建立');
// 发送认证消息(携带token)
const authMessage = {
type: 'auth',
token: '用户登录token'
};
socket.send(JSON.stringify(authMessage));
};
// 接收消息事件
socket.onmessage = function(event) {
const message = JSON.parse(event.data);
switch(message.type) {
case 'demo-message-receive':
handleChatMessage(message);
break;
case 'system-notification':
handleSystemNotification(message);
break;
default:
console.log('收到未知类型消息:', message);
}
};
// 发送聊天消息
function sendChatMessage(toUserId, content) {
const message = {
type: 'demo-message-send',
toUserId: toUserId,
text: content
};
socket.send(JSON.stringify(message));
}
高级特性:消息分发模式
yudao-cloud支持多种消息分发模式,满足不同业务场景需求:
1. 单播消息(Unicast)
// 发送给指定用户
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
1024L, // 目标用户ID
"private-message",
messageContent
);
2. 广播消息(Broadcast)
// 发送给所有管理员用户
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
"system-notification",
notificationContent
);
3. 会话级消息(Session-level)
// 发送给特定会话
webSocketMessageSender.sendObject(
"session-id-123",
"session-message",
sessionSpecificContent
);
消息可靠性保障
心跳检测机制
sequenceDiagram
participant Client
participant Server
participant SessionManager
Client->>Server: WebSocket连接
Server->>SessionManager: 注册会话
Note over Client,Server: 连接建立成功
loop 每30秒一次心跳
Client->>Server: Ping消息
Server->>Client: Pong响应
end
Note over Client,Server: 网络异常断开
Server->>SessionManager: 检测到连接断开
SessionManager->>SessionManager: 清理会话资源
重连机制实现
class WebSocketManager {
constructor() {
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
this.socket = new WebSocket('ws://localhost:9321/websocket');
this.socket.onclose = (event) => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, 3000 * this.reconnectAttempts);
}
};
this.socket.onopen = (event) => {
this.reconnectAttempts = 0;
this.authenticate();
};
}
}
性能优化策略
1. 连接池管理
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketSessionManager webSocketSessionManager() {
return new WebSocketSessionManagerImpl(10000); // 最大连接数限制
}
}
2. 消息压缩
public void sendCompressedMessage(Integer userType, Long userId, String messageType, Object message) {
String json = JsonUtils.toJsonString(message);
byte[] compressed = compress(json.getBytes(StandardCharsets.UTF_8));
// 发送压缩后的消息
}
3. 批量消息处理
@Scheduled(fixedRate = 1000) // 每秒批量处理一次
public void processBatchMessages() {
List<WebSocketMessage> batch = messageQueue.drainToBatch();
if (!batch.isEmpty()) {
sendBatchMessages(batch);
}
}
安全防护措施
1. 认证授权
@Component
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 从请求中提取token并进行验证
String token = extractToken(request);
if (!authService.validateToken(token)) {
return false; // 认证失败,拒绝连接
}
// 设置用户信息到attributes中
LoginUser loginUser = authService.getLoginUser(token);
attributes.put("loginUser", loginUser);
return true;
}
}
2. 消息内容过滤
public class MessageContentFilter {
private static final Pattern XSS_PATTERN = Pattern.compile("[<>\"']");
public static String filter(String content) {
if (content == null) return null;
return XSS_PATTERN.matcher(content).replaceAll("");
}
}
实战案例:系统通知功能
后端服务实现
@Service
public class NotificationService {
@Autowired
private WebSocketMessageSender webSocketMessageSender;
public void sendSystemNotification(String title, String content, UserTypeEnum userType) {
SystemNotification notification = new SystemNotification()
.setTitle(title)
.setContent(content)
.setCreateTime(new Date());
webSocketMessageSender.sendObject(
userType.getValue(),
"system-notification",
notification
);
}
public void sendPersonalNotification(Long userId, String title, String content) {
PersonalNotification notification = new PersonalNotification()
.setTitle(title)
.setContent(content)
.setUserId(userId)
.setCreateTime(new Date());
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
userId,
"personal-notification",
notification
);
}
}
前端消息处理
// 系统通知处理
function handleSystemNotification(notification) {
const notificationElement = `
<div class="notification">
<h4>${escapeHtml(notification.title)}</h4>
<p>${escapeHtml(notification.content)}</p>
<span class="time">${formatTime(notification.createTime)}</span>
</div>
`;
// 添加到通知列表
$('#notification-list').prepend(notificationElement);
// 显示桌面通知(如果浏览器支持)
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.content,
icon: '/static/images/logo.png'
});
}
}
监控与调试
1. 连接状态监控
@RestController
@RequestMapping("/admin/websocket")
public class WebSocketMonitorController {
@Autowired
private WebSocketSessionManager sessionManager;
@GetMapping("/stats")
public CommonResult<WebSocketStatsVO> getStats() {
WebSocketStatsVO stats = new WebSocketStatsVO();
stats.setActiveConnections(sessionManager.getSessionCount());
stats.setTotalMessagesSent(messageCounter.getTotalSent());
stats.setTotalMessagesReceived(messageCounter.getTotalReceived());
return CommonResult.success(stats);
}
}
2. 日志调试
在application.yml中配置WebSocket调试日志:
logging:
level:
cn.iocoder.yudao.framework.websocket: DEBUG
org.springframework.web.socket: DEBUG
常见问题解决方案
1. 连接数限制问题
问题描述:WebSocket连接数达到上限 解决方案:
yudao:
websocket:
server:
max-sessions: 50000 # 调整最大连接数
2. 内存泄漏问题
问题描述:Session未正确清理导致内存泄漏 解决方案:
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
sessionManager.removeSession(event.getSessionId());
// 清理相关资源
}
3. 跨域问题
解决方案:
@Bean
public WebSocketHandlerRegistrationCustomizer webSocketHandlerRegistrationCustomizer() {
return registration -> registration.setAllowedOrigins("*");
}
总结
yudao-cloud的WebSocket模块提供了完整的企业级实时通信解决方案,具备以下核心优势:
- 架构清晰:采用分层设计,各组件职责明确
- 功能丰富:支持单播、广播、会话级多种消息模式
- 性能优异:内置连接池、消息压缩、批量处理等优化策略
- 安全可靠:完善的认证授权和内容过滤机制
- 易于扩展:支持多种消息中间件集成
通过本文的详细讲解和代码示例,开发者可以快速掌握yudao-cloud WebSocket的使用方法,构建出高性能、高可用的实时消息推送和在线聊天功能。无论是系统通知、实时监控,还是在线客服、协同编辑,yudao-cloud都能提供强有力的技术支撑。
在实际项目中,建议根据具体业务需求选择合适的消息分发模式,并结合监控系统对WebSocket连接状态进行实时监控,确保系统的稳定运行。
登录后查看全文
热门项目推荐
相关项目推荐
GLM-5智谱 AI 正式发布 GLM-5,旨在应对复杂系统工程和长时域智能体任务。Jinja00
LongCat-AudioDiT-1BLongCat-AudioDiT 是一款基于扩散模型的文本转语音(TTS)模型,代表了当前该领域的最高水平(SOTA),它直接在波形潜空间中进行操作。00
jiuwenclawJiuwenClaw 是一款基于openJiuwen开发的智能AI Agent,它能够将大语言模型的强大能力,通过你日常使用的各类通讯应用,直接延伸至你的指尖。Python0248- QQwen3.5-397B-A17BQwen3.5 实现了重大飞跃,整合了多模态学习、架构效率、强化学习规模以及全球可访问性等方面的突破性进展,旨在为开发者和企业赋予前所未有的能力与效率。Jinja00
AtomGit城市坐标计划AtomGit 城市坐标计划开启!让开源有坐标,让城市有星火。致力于与城市合伙人共同构建并长期运营一个健康、活跃的本地开发者生态。01
HivisionIDPhotos⚡️HivisionIDPhotos: a lightweight and efficient AI ID photos tools. 一个轻量级的AI证件照制作算法。Python05
热门内容推荐
最新内容推荐
解锁Duix-Avatar本地化部署:构建专属AI视频创作平台的实战指南Linux内核性能优化实战指南:从调度器选择到系统响应速度提升DBeaver PL/SQL开发实战:解决Oracle存储过程难题的完整方案RNacos技术实践:高性能服务发现与配置中心5步法RePKG资源提取与文件转换全攻略:从入门到精通的技术指南揭秘FLUX 1-dev:如何通过轻量级架构实现高效文本到图像转换OpenPilot实战指南:从入门到精通的5个关键步骤Realtek r8125驱动:释放2.5G网卡性能的Linux配置指南Real-ESRGAN:AI图像增强与超分辨率技术实战指南静态网站托管新手指南:零成本搭建专业级个人网站
项目优选
收起
deepin linux kernel
C
27
13
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
642
4.19 K
Ascend Extension for PyTorch
Python
478
579
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
934
841
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
386
272
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
1.52 K
867
暂无简介
Dart
885
211
仓颉编程语言运行时与标准库。
Cangjie
161
922
昇腾LLM分布式训练框架
Python
139
163
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
69
21