RuoYi-Cloud-Plus SSE推送:实时消息方案
2026-02-04 04:46:32作者:魏侃纯Zoe
引言:实时消息推送的挑战与机遇
在现代企业级应用中,实时消息推送已成为提升用户体验的关键技术。传统的轮询(Polling)方式不仅浪费服务器资源,还会造成消息延迟。RuoYi-Cloud-Plus 基于 Server-Sent Events(SSE)技术,构建了一套高效、可靠的实时消息推送方案,为微服务架构下的实时通信提供了完美解决方案。
通过本文,您将深入了解:
- SSE 技术原理与优势对比
- RuoYi-Cloud-Plus SSE 模块架构设计
- 核心组件实现细节与源码解析
- 实战配置与使用指南
- 性能优化与最佳实践
SSE 技术原理与优势
什么是 Server-Sent Events?
Server-Sent Events(SSE)是一种基于 HTTP 的服务器向客户端推送实时事件的技术标准。与 WebSocket 不同,SSE 是单向通信(服务器到客户端),采用简单的文本协议,天然支持断线重连和事件ID追踪。
sequenceDiagram
participant Client as 客户端
participant Server as 服务器
Client->>Server: 建立SSE连接 (HTTP GET)
Server-->>Client: 保持连接 (text/event-stream)
Server->>Client: 实时推送消息事件
Note over Client: 自动处理断线重连
SSE 与其他技术的对比
| 技术 | 协议 | 通信方向 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| SSE | HTTP/1.1 | 单向(服务器→客户端) | 低 | 实时通知、消息推送 |
| WebSocket | WS/WSS | 双向 | 中 | 聊天、实时协作 |
| Long Polling | HTTP | 双向(模拟) | 高 | 兼容性要求高 |
| Polling | HTTP | 客户端主动 | 低 | 简单场景 |
RuoYi-Cloud-Plus SSE 核心优势
- 轻量级实现:基于标准HTTP协议,无需额外协议支持
- 自动重连:内置连接恢复机制,确保消息不丢失
- 事件分类型:支持多种事件类型,灵活应对不同场景
- 集群支持:通过Redis Pub/Sub实现分布式消息推送
- 安全可控:集成Sa-Token认证,确保连接安全性
架构设计与核心组件
整体架构图
flowchart TD
A[客户端] --> B[SSE控制器]
B --> C[SseEmitterManager]
C --> D[连接管理]
C --> E[消息分发]
C --> F[Redis Pub/Sub]
F --> G[集群节点1]
F --> H[集群节点2]
F --> I[集群节点N]
核心组件详解
1. SseEmitterManager - 连接管理核心
public class SseEmitterManager {
// 用户Token与Emitter的映射关系
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
// 建立SSE连接
public SseEmitter connect(Long userId, String token) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
SseEmitter emitter = new SseEmitter(86400000L); // 24小时超时
emitters.put(token, emitter);
// 设置生命周期回调
emitter.onCompletion(() -> cleanupConnection(userId, token));
emitter.onTimeout(() -> cleanupConnection(userId, token));
emitter.onError((e) -> cleanupConnection(userId, token));
return emitter;
}
}
2. SseMessageDto - 消息传输对象
@Data
public class SseMessageDto implements Serializable {
// 需要推送到的用户ID列表
private List<Long> userIds;
// 需要发送的消息内容
private String message;
}
3. SseController - RESTful接口
@RestController
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
public class SseController {
@GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestHeader("userId") Long userId,
@RequestHeader("token") String tokenValue) {
return sseEmitterManager.connect(userId, tokenValue);
}
@GetMapping(value = "${sse.path}/send")
public void sendMessage(@RequestParam("userIds") List<Long> userIds,
@RequestParam("message") String message) {
SseMessageDto dto = new SseMessageDto();
dto.setUserIds(userIds);
dto.setMessage(message);
sseEmitterManager.publishMessage(dto);
}
}
配置与使用指南
1. 基础配置
在 application.yml 中启用SSE功能:
sse:
enabled: true
path: /sse/connect
2. Maven依赖配置
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-sse</artifactId>
</dependency>
3. 客户端连接示例
// 建立SSE连接
const eventSource = new EventSource('/sse/connect', {
headers: {
'userId': '123',
'token': 'user-session-token'
}
});
// 监听消息事件
eventSource.addEventListener('message', function(event) {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
// 更新UI或执行其他操作
});
// 处理连接错误
eventSource.onerror = function(error) {
console.error('SSE连接错误:', error);
// 自动重连逻辑
};
4. 服务端消息推送
// 向特定用户发送消息
sseEmitterManager.sendMessage(123L, "您有一条新通知");
// 向多个用户发送消息
SseMessageDto messageDto = new SseMessageDto();
messageDto.setUserIds(Arrays.asList(123L, 456L, 789L));
messageDto.setMessage("系统维护通知");
sseEmitterManager.publishMessage(messageDto);
// 群发消息
sseEmitterManager.publishAll("全体用户通知");
高级特性与最佳实践
1. 集群环境下的消息分发
RuoYi-Cloud-Plus SSE 通过Redis Pub/Sub实现集群消息同步:
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
RedisUtils.subscribe("global:sse", SseMessageDto.class, consumer);
}
public void publishMessage(SseMessageDto sseMessageDto) {
RedisUtils.publish("global:sse", sseMessageDto, consumer -> {
log.info("SSE集群消息分发完成");
});
}
2. 连接生命周期管理
// 连接超时设置(24小时)
SseEmitter emitter = new SseEmitter(86400000L);
// 连接状态监控
emitter.onCompletion(() -> {
log.info("SSE连接正常完成");
cleanupConnection(userId, token);
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
cleanupConnection(userId, token);
});
emitter.onError((e) -> {
log.error("SSE连接错误", e);
cleanupConnection(userId, token);
});
3. 安全认证集成
集成Sa-Token确保连接安全性:
// 在控制器中验证用户身份
@SaCheckLogin
@GetMapping(value = "${sse.path}")
public SseEmitter connect(@RequestHeader("userId") Long userId,
@RequestHeader("token") String tokenValue) {
// 验证token有效性
if (!StpUtil.getTokenValue().equals(tokenValue)) {
throw new RuntimeException("Token验证失败");
}
return sseEmitterManager.connect(userId, tokenValue);
}
性能优化策略
1. 连接池优化
// 使用ConcurrentHashMap管理连接,避免锁竞争
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
// 定期清理无效连接
@Scheduled(fixedRate = 300000) // 每5分钟清理一次
public void cleanupInactiveConnections() {
USER_TOKEN_EMITTERS.entrySet().removeIf(entry ->
entry.getValue().isEmpty());
}
2. 消息压缩与批处理
对于大量小消息,建议进行批处理:
public void sendBatchMessages(Long userId, List<String> messages) {
String batchMessage = String.join("\n", messages);
sseEmitterManager.sendMessage(userId, batchMessage);
}
3. 监控与告警
集成Prometheus监控SSE连接状态:
@Bean
public MeterBinder sseMetrics(SseEmitterManager sseEmitterManager) {
return registry -> {
Gauge.builder("sse.active.connections",
() -> sseEmitterManager.getActiveConnectionCount())
.description("当前活跃的SSE连接数")
.register(registry);
};
}
常见问题与解决方案
1. 连接数限制问题
问题:浏览器对同一域名的SSE连接数有限制(通常6个) 解决方案:使用不同的子域名或HTTP/2多路复用
2. 防火墙与代理问题
问题:某些网络环境可能阻止长连接 解决方案:配置合适的超时时间,实现自动重连机制
3. 消息顺序保证
问题:集群环境下消息可能乱序到达 解决方案:为消息添加序列号,客户端进行排序处理
public class SseMessageDto implements Serializable {
private Long sequenceId; // 消息序列号
private List<Long> userIds;
private String message;
}
实战应用场景
1. 实时通知系统
// 消息通知服务
@Service
public class NotificationService {
@Autowired
private SseEmitterManager sseEmitterManager;
public void sendNotification(Long userId, String title, String content) {
Map<String, Object> message = Map.of(
"type", "notification",
"title", title,
"content", content,
"timestamp", System.currentTimeMillis()
);
sseEmitterManager.sendMessage(userId, JSON.toJSONString(message));
}
}
2. 实时数据监控
// 监控数据推送
@Scheduled(fixedRate = 1000)
public void pushMonitoringData() {
MonitoringData data = monitoringService.getRealTimeData();
sseEmitterManager.publishAll(JSON.toJSONString(data));
}
3. 在线协作功能
// 协同编辑通知
public void notifyCollaborators(Long documentId, String operation) {
List<Long> collaborators = collaborationService.getCollaborators(documentId);
SseMessageDto message = new SseMessageDto();
message.setUserIds(collaborators);
message.setMessage(JSON.toJSONString(Map.of(
"type", "collaboration",
"documentId", documentId,
"operation", operation
)));
sseEmitterManager.publishMessage(message);
}
总结与展望
RuoYi-Cloud-Plus 的 SSE 推送方案为企业级实时消息通信提供了完整、可靠的解决方案。通过精心设计的架构和丰富的功能特性,该系统能够满足各种复杂的实时通信需求。
核心价值总结:
- 🚀 高性能:基于内存映射和Redis Pub/Sub的高效消息分发
- 🔒 高安全:集成Sa-Token认证,确保连接安全性
- 📈 易扩展:集群化设计,支持水平扩展
- 🛠️ 易集成:简洁的API设计,快速接入现有系统
- 📊 可观测:完善的监控和日志体系
未来发展方向:
- 支持WebSocket双协议切换
- 消息持久化与可靠性保证
- 移动端SDK集成
- 流量控制与限流保护
- 消息加密传输支持
通过本文的详细解析,相信您已经对 RuoYi-Cloud-Plus 的 SSE 实时消息方案有了深入的理解。无论是构建实时通知系统、在线协作平台还是实时数据监控,这套方案都能为您提供强大的技术支撑。
登录后查看全文
热门项目推荐
相关项目推荐
atomcodeClaude Code 的开源替代方案。连接任意大模型,编辑代码,运行命令,自动验证 — 全自动执行。用 Rust 构建,极致性能。 | An open-source alternative to Claude Code. Connect any LLM, edit code, run commands, and verify changes — autonomously. Built in Rust for speed. Get StartedRust0155- DDeepSeek-V4-ProDeepSeek-V4-Pro(总参数 1.6 万亿,激活 49B)面向复杂推理和高级编程任务,在代码竞赛、数学推理、Agent 工作流等场景表现优异,性能接近国际前沿闭源模型。Python00
LongCat-Video-Avatar-1.5最新开源LongCat-Video-Avatar 1.5 版本,这是一款经过升级的开源框架,专注于音频驱动人物视频生成的极致实证优化与生产级就绪能力。该版本在 LongCat-Video 基础模型之上构建,可生成高度稳定的商用级虚拟人视频,支持音频-文本转视频(AT2V)、音频-文本-图像转视频(ATI2V)以及视频续播等原生任务,并能无缝兼容单流与多流音频输入。00
auto-devAutoDev 是一个 AI 驱动的辅助编程插件。AutoDev 支持一键生成测试、代码、提交信息等,还能够与您的需求管理系统(例如Jira、Trello、Github Issue 等)直接对接。 在IDE 中,您只需简单点击,AutoDev 会根据您的需求自动为您生成代码。Kotlin03
Intern-S2-PreviewIntern-S2-Preview,这是一款高效的350亿参数科学多模态基础模型。除了常规的参数与数据规模扩展外,Intern-S2-Preview探索了任务扩展:通过提升科学任务的难度、多样性与覆盖范围,进一步释放模型能力。Python00
skillhubopenJiuwen 生态的 Skill 托管与分发开源方案,支持自建与可选 ClawHub 兼容。Python0112
热门内容推荐
最新内容推荐
项目优选
收起
暂无描述
Dockerfile
733
4.76 K
deepin linux kernel
C
31
16
Ascend Extension for PyTorch
Python
652
797
Claude Code 的开源替代方案。连接任意大模型,编辑代码,运行命令,自动验证 — 全自动执行。用 Rust 构建,极致性能。 | An open-source alternative to Claude Code. Connect any LLM, edit code, run commands, and verify changes — autonomously. Built in Rust for speed.
Get Started
Rust
1.25 K
153
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
1.1 K
611
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
1.01 K
1.01 K
华为昇腾面向大规模分布式训练的多模态大模型套件,支撑多模态生成、多模态理解。
Python
147
237
昇腾LLM分布式训练框架
Python
168
200
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
434
395
暂无简介
Dart
987
253