yudao-cloud WebSocket:实时消息推送与在线聊天功能
2026-02-04 04:36:09作者:何将鹤
引言:实时通信的迫切需求
在现代Web应用中,实时消息推送和在线聊天功能已成为提升用户体验的关键要素。无论是系统通知、订单状态更新,还是客服聊天、团队协作,都需要WebSocket技术来实现真正的双向实时通信。yudao-cloud基于Spring Boot提供了强大的WebSocket解决方案,让开发者能够轻松构建高性能的实时应用。
WebSocket核心架构设计
yudao-cloud的WebSocket模块采用分层架构设计,确保系统的可扩展性和维护性:
classDiagram
class WebSocketMessageSender {
+send(Integer userType, Long userId, String messageType, String messageContent)
+send(Integer userType, String messageType, String messageContent)
+send(String sessionId, String messageType, String messageContent)
}
class WebSocketMessageListener~T~ {
+onMessage(WebSocketSession session, T message)
+getType() String
}
class WebSocketSessionManager {
+addSession(WebSocketSession session)
+removeSession(String sessionId)
+getSession(String sessionId) WebSocketSession
}
class LoginUserHandshakeInterceptor {
+beforeHandshake(...) boolean
}
WebSocketMessageSender <|.. LocalWebSocketMessageSender
WebSocketMessageSender <|.. RedisWebSocketMessageSender
WebSocketMessageSender <|.. KafkaWebSocketMessageSender
WebSocketMessageListener <|.. DemoWebSocketMessageListener
核心组件功能说明
| 组件名称 | 职责描述 | 关键技术 |
|---|---|---|
| WebSocketMessageSender | 消息发送器接口,支持多种消息发送模式 | 策略模式、JSON序列化 |
| WebSocketMessageListener | 消息监听器,处理接收到的WebSocket消息 | 泛型、事件驱动 |
| WebSocketSessionManager | 会话管理,维护在线用户连接 | ConcurrentHashMap、Session管理 |
| LoginUserHandshakeInterceptor | 握手拦截器,实现用户认证 | Spring Security集成 |
快速开始:构建实时聊天功能
1. 环境配置
首先在application.yml中启用WebSocket功能:
yudao:
websocket:
enable: true
server:
port: 9321 # WebSocket服务器端口
2. 定义消息模型
创建发送和接收的消息DTO:
// 发送消息DTO
@Data
@Schema(description = "WebSocket 示例发送消息")
public class DemoSendMessage {
@Schema(description = "接收用户编号", example = "1024")
private Long toUserId;
@Schema(description = "消息内容", example = "你好,这是一条测试消息")
private String text;
}
// 接收消息DTO
@Data
@Schema(description = "WebSocket 示例接收消息")
public class DemoReceiveMessage {
@Schema(description = "发送用户编号", example = "1")
private Long fromUserId;
@Schema(description = "消息内容", example = "你好,这是一条测试消息")
private String text;
@Schema(description = "是否单发", example = "true")
private Boolean single;
}
3. 实现消息监听器
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {
@Autowired
private WebSocketMessageSender webSocketMessageSender;
@Override
public void onMessage(WebSocketSession session, DemoSendMessage message) {
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
// 单发消息处理
if (message.getToUserId() != null) {
DemoReceiveMessage toMessage = new DemoReceiveMessage()
.setFromUserId(fromUserId)
.setText(message.getText())
.setSingle(true);
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
message.getToUserId(),
"demo-message-receive",
toMessage
);
return;
}
// 群发消息处理
DemoReceiveMessage toMessage = new DemoReceiveMessage()
.setFromUserId(fromUserId)
.setText(message.getText())
.setSingle(false);
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
"demo-message-receive",
toMessage
);
}
@Override
public String getType() {
return "demo-message-send";
}
}
4. 前端WebSocket连接
// 建立WebSocket连接
const socket = new WebSocket('ws://localhost:9321/websocket');
// 连接建立事件
socket.onopen = function(event) {
console.log('WebSocket连接已建立');
// 发送认证消息(携带token)
const authMessage = {
type: 'auth',
token: '用户登录token'
};
socket.send(JSON.stringify(authMessage));
};
// 接收消息事件
socket.onmessage = function(event) {
const message = JSON.parse(event.data);
switch(message.type) {
case 'demo-message-receive':
handleChatMessage(message);
break;
case 'system-notification':
handleSystemNotification(message);
break;
default:
console.log('收到未知类型消息:', message);
}
};
// 发送聊天消息
function sendChatMessage(toUserId, content) {
const message = {
type: 'demo-message-send',
toUserId: toUserId,
text: content
};
socket.send(JSON.stringify(message));
}
高级特性:消息分发模式
yudao-cloud支持多种消息分发模式,满足不同业务场景需求:
1. 单播消息(Unicast)
// 发送给指定用户
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
1024L, // 目标用户ID
"private-message",
messageContent
);
2. 广播消息(Broadcast)
// 发送给所有管理员用户
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
"system-notification",
notificationContent
);
3. 会话级消息(Session-level)
// 发送给特定会话
webSocketMessageSender.sendObject(
"session-id-123",
"session-message",
sessionSpecificContent
);
消息可靠性保障
心跳检测机制
sequenceDiagram
participant Client
participant Server
participant SessionManager
Client->>Server: WebSocket连接
Server->>SessionManager: 注册会话
Note over Client,Server: 连接建立成功
loop 每30秒一次心跳
Client->>Server: Ping消息
Server->>Client: Pong响应
end
Note over Client,Server: 网络异常断开
Server->>SessionManager: 检测到连接断开
SessionManager->>SessionManager: 清理会话资源
重连机制实现
class WebSocketManager {
constructor() {
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
this.socket = new WebSocket('ws://localhost:9321/websocket');
this.socket.onclose = (event) => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, 3000 * this.reconnectAttempts);
}
};
this.socket.onopen = (event) => {
this.reconnectAttempts = 0;
this.authenticate();
};
}
}
性能优化策略
1. 连接池管理
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketSessionManager webSocketSessionManager() {
return new WebSocketSessionManagerImpl(10000); // 最大连接数限制
}
}
2. 消息压缩
public void sendCompressedMessage(Integer userType, Long userId, String messageType, Object message) {
String json = JsonUtils.toJsonString(message);
byte[] compressed = compress(json.getBytes(StandardCharsets.UTF_8));
// 发送压缩后的消息
}
3. 批量消息处理
@Scheduled(fixedRate = 1000) // 每秒批量处理一次
public void processBatchMessages() {
List<WebSocketMessage> batch = messageQueue.drainToBatch();
if (!batch.isEmpty()) {
sendBatchMessages(batch);
}
}
安全防护措施
1. 认证授权
@Component
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 从请求中提取token并进行验证
String token = extractToken(request);
if (!authService.validateToken(token)) {
return false; // 认证失败,拒绝连接
}
// 设置用户信息到attributes中
LoginUser loginUser = authService.getLoginUser(token);
attributes.put("loginUser", loginUser);
return true;
}
}
2. 消息内容过滤
public class MessageContentFilter {
private static final Pattern XSS_PATTERN = Pattern.compile("[<>\"']");
public static String filter(String content) {
if (content == null) return null;
return XSS_PATTERN.matcher(content).replaceAll("");
}
}
实战案例:系统通知功能
后端服务实现
@Service
public class NotificationService {
@Autowired
private WebSocketMessageSender webSocketMessageSender;
public void sendSystemNotification(String title, String content, UserTypeEnum userType) {
SystemNotification notification = new SystemNotification()
.setTitle(title)
.setContent(content)
.setCreateTime(new Date());
webSocketMessageSender.sendObject(
userType.getValue(),
"system-notification",
notification
);
}
public void sendPersonalNotification(Long userId, String title, String content) {
PersonalNotification notification = new PersonalNotification()
.setTitle(title)
.setContent(content)
.setUserId(userId)
.setCreateTime(new Date());
webSocketMessageSender.sendObject(
UserTypeEnum.ADMIN.getValue(),
userId,
"personal-notification",
notification
);
}
}
前端消息处理
// 系统通知处理
function handleSystemNotification(notification) {
const notificationElement = `
<div class="notification">
<h4>${escapeHtml(notification.title)}</h4>
<p>${escapeHtml(notification.content)}</p>
<span class="time">${formatTime(notification.createTime)}</span>
</div>
`;
// 添加到通知列表
$('#notification-list').prepend(notificationElement);
// 显示桌面通知(如果浏览器支持)
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.content,
icon: '/static/images/logo.png'
});
}
}
监控与调试
1. 连接状态监控
@RestController
@RequestMapping("/admin/websocket")
public class WebSocketMonitorController {
@Autowired
private WebSocketSessionManager sessionManager;
@GetMapping("/stats")
public CommonResult<WebSocketStatsVO> getStats() {
WebSocketStatsVO stats = new WebSocketStatsVO();
stats.setActiveConnections(sessionManager.getSessionCount());
stats.setTotalMessagesSent(messageCounter.getTotalSent());
stats.setTotalMessagesReceived(messageCounter.getTotalReceived());
return CommonResult.success(stats);
}
}
2. 日志调试
在application.yml中配置WebSocket调试日志:
logging:
level:
cn.iocoder.yudao.framework.websocket: DEBUG
org.springframework.web.socket: DEBUG
常见问题解决方案
1. 连接数限制问题
问题描述:WebSocket连接数达到上限 解决方案:
yudao:
websocket:
server:
max-sessions: 50000 # 调整最大连接数
2. 内存泄漏问题
问题描述:Session未正确清理导致内存泄漏 解决方案:
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
sessionManager.removeSession(event.getSessionId());
// 清理相关资源
}
3. 跨域问题
解决方案:
@Bean
public WebSocketHandlerRegistrationCustomizer webSocketHandlerRegistrationCustomizer() {
return registration -> registration.setAllowedOrigins("*");
}
总结
yudao-cloud的WebSocket模块提供了完整的企业级实时通信解决方案,具备以下核心优势:
- 架构清晰:采用分层设计,各组件职责明确
- 功能丰富:支持单播、广播、会话级多种消息模式
- 性能优异:内置连接池、消息压缩、批量处理等优化策略
- 安全可靠:完善的认证授权和内容过滤机制
- 易于扩展:支持多种消息中间件集成
通过本文的详细讲解和代码示例,开发者可以快速掌握yudao-cloud WebSocket的使用方法,构建出高性能、高可用的实时消息推送和在线聊天功能。无论是系统通知、实时监控,还是在线客服、协同编辑,yudao-cloud都能提供强有力的技术支撑。
在实际项目中,建议根据具体业务需求选择合适的消息分发模式,并结合监控系统对WebSocket连接状态进行实时监控,确保系统的稳定运行。
登录后查看全文
热门项目推荐
相关项目推荐
atomcodeClaude Code 的开源替代方案。连接任意大模型,编辑代码,运行命令,自动验证 — 全自动执行。用 Rust 构建,极致性能。 | An open-source alternative to Claude Code. Connect any LLM, edit code, run commands, and verify changes — autonomously. Built in Rust for speed. Get StartedRust0153- DDeepSeek-V4-ProDeepSeek-V4-Pro(总参数 1.6 万亿,激活 49B)面向复杂推理和高级编程任务,在代码竞赛、数学推理、Agent 工作流等场景表现优异,性能接近国际前沿闭源模型。Python00
LongCat-Video-Avatar-1.5最新开源LongCat-Video-Avatar 1.5 版本,这是一款经过升级的开源框架,专注于音频驱动人物视频生成的极致实证优化与生产级就绪能力。该版本在 LongCat-Video 基础模型之上构建,可生成高度稳定的商用级虚拟人视频,支持音频-文本转视频(AT2V)、音频-文本-图像转视频(ATI2V)以及视频续播等原生任务,并能无缝兼容单流与多流音频输入。00
auto-devAutoDev 是一个 AI 驱动的辅助编程插件。AutoDev 支持一键生成测试、代码、提交信息等,还能够与您的需求管理系统(例如Jira、Trello、Github Issue 等)直接对接。 在IDE 中,您只需简单点击,AutoDev 会根据您的需求自动为您生成代码。Kotlin03
Intern-S2-PreviewIntern-S2-Preview,这是一款高效的350亿参数科学多模态基础模型。除了常规的参数与数据规模扩展外,Intern-S2-Preview探索了任务扩展:通过提升科学任务的难度、多样性与覆盖范围,进一步释放模型能力。Python00
skillhubopenJiuwen 生态的 Skill 托管与分发开源方案,支持自建与可选 ClawHub 兼容。Python0112
热门内容推荐
最新内容推荐
项目优选
收起
暂无描述
Dockerfile
733
4.76 K
deepin linux kernel
C
31
16
Ascend Extension for PyTorch
Python
652
797
Claude Code 的开源替代方案。连接任意大模型,编辑代码,运行命令,自动验证 — 全自动执行。用 Rust 构建,极致性能。 | An open-source alternative to Claude Code. Connect any LLM, edit code, run commands, and verify changes — autonomously. Built in Rust for speed.
Get Started
Rust
1.25 K
153
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
1.1 K
611
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
1.01 K
1.01 K
华为昇腾面向大规模分布式训练的多模态大模型套件,支撑多模态生成、多模态理解。
Python
147
237
昇腾LLM分布式训练框架
Python
168
200
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
434
395
暂无简介
Dart
987
253