RuoYi-Cloud-Plus SSE推送:实时消息方案
2026-02-04 04:46:32作者:魏侃纯Zoe
引言:实时消息推送的挑战与机遇
在现代企业级应用中,实时消息推送已成为提升用户体验的关键技术。传统的轮询(Polling)方式不仅浪费服务器资源,还会造成消息延迟。RuoYi-Cloud-Plus 基于 Server-Sent Events(SSE)技术,构建了一套高效、可靠的实时消息推送方案,为微服务架构下的实时通信提供了完美解决方案。
通过本文,您将深入了解:
- SSE 技术原理与优势对比
- RuoYi-Cloud-Plus SSE 模块架构设计
- 核心组件实现细节与源码解析
- 实战配置与使用指南
- 性能优化与最佳实践
SSE 技术原理与优势
什么是 Server-Sent Events?
Server-Sent Events(SSE)是一种基于 HTTP 的服务器向客户端推送实时事件的技术标准。与 WebSocket 不同,SSE 是单向通信(服务器到客户端),采用简单的文本协议,天然支持断线重连和事件ID追踪。
sequenceDiagram
participant Client as 客户端
participant Server as 服务器
Client->>Server: 建立SSE连接 (HTTP GET)
Server-->>Client: 保持连接 (text/event-stream)
Server->>Client: 实时推送消息事件
Note over Client: 自动处理断线重连
SSE 与其他技术的对比
| 技术 | 协议 | 通信方向 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| SSE | HTTP/1.1 | 单向(服务器→客户端) | 低 | 实时通知、消息推送 |
| WebSocket | WS/WSS | 双向 | 中 | 聊天、实时协作 |
| Long Polling | HTTP | 双向(模拟) | 高 | 兼容性要求高 |
| Polling | HTTP | 客户端主动 | 低 | 简单场景 |
RuoYi-Cloud-Plus SSE 核心优势
- 轻量级实现:基于标准HTTP协议,无需额外协议支持
- 自动重连:内置连接恢复机制,确保消息不丢失
- 事件分类型:支持多种事件类型,灵活应对不同场景
- 集群支持:通过Redis Pub/Sub实现分布式消息推送
- 安全可控:集成Sa-Token认证,确保连接安全性
架构设计与核心组件
整体架构图
flowchart TD
A[客户端] --> B[SSE控制器]
B --> C[SseEmitterManager]
C --> D[连接管理]
C --> E[消息分发]
C --> F[Redis Pub/Sub]
F --> G[集群节点1]
F --> H[集群节点2]
F --> I[集群节点N]
核心组件详解
1. SseEmitterManager - 连接管理核心
public class SseEmitterManager {
// 用户Token与Emitter的映射关系
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
// 建立SSE连接
public SseEmitter connect(Long userId, String token) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
SseEmitter emitter = new SseEmitter(86400000L); // 24小时超时
emitters.put(token, emitter);
// 设置生命周期回调
emitter.onCompletion(() -> cleanupConnection(userId, token));
emitter.onTimeout(() -> cleanupConnection(userId, token));
emitter.onError((e) -> cleanupConnection(userId, token));
return emitter;
}
}
2. SseMessageDto - 消息传输对象
@Data
public class SseMessageDto implements Serializable {
// 需要推送到的用户ID列表
private List<Long> userIds;
// 需要发送的消息内容
private String message;
}
3. SseController - RESTful接口
@RestController
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
public class SseController {
@GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestHeader("userId") Long userId,
@RequestHeader("token") String tokenValue) {
return sseEmitterManager.connect(userId, tokenValue);
}
@GetMapping(value = "${sse.path}/send")
public void sendMessage(@RequestParam("userIds") List<Long> userIds,
@RequestParam("message") String message) {
SseMessageDto dto = new SseMessageDto();
dto.setUserIds(userIds);
dto.setMessage(message);
sseEmitterManager.publishMessage(dto);
}
}
配置与使用指南
1. 基础配置
在 application.yml 中启用SSE功能:
sse:
enabled: true
path: /sse/connect
2. Maven依赖配置
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-sse</artifactId>
</dependency>
3. 客户端连接示例
// 建立SSE连接
const eventSource = new EventSource('/sse/connect', {
headers: {
'userId': '123',
'token': 'user-session-token'
}
});
// 监听消息事件
eventSource.addEventListener('message', function(event) {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
// 更新UI或执行其他操作
});
// 处理连接错误
eventSource.onerror = function(error) {
console.error('SSE连接错误:', error);
// 自动重连逻辑
};
4. 服务端消息推送
// 向特定用户发送消息
sseEmitterManager.sendMessage(123L, "您有一条新通知");
// 向多个用户发送消息
SseMessageDto messageDto = new SseMessageDto();
messageDto.setUserIds(Arrays.asList(123L, 456L, 789L));
messageDto.setMessage("系统维护通知");
sseEmitterManager.publishMessage(messageDto);
// 群发消息
sseEmitterManager.publishAll("全体用户通知");
高级特性与最佳实践
1. 集群环境下的消息分发
RuoYi-Cloud-Plus SSE 通过Redis Pub/Sub实现集群消息同步:
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
RedisUtils.subscribe("global:sse", SseMessageDto.class, consumer);
}
public void publishMessage(SseMessageDto sseMessageDto) {
RedisUtils.publish("global:sse", sseMessageDto, consumer -> {
log.info("SSE集群消息分发完成");
});
}
2. 连接生命周期管理
// 连接超时设置(24小时)
SseEmitter emitter = new SseEmitter(86400000L);
// 连接状态监控
emitter.onCompletion(() -> {
log.info("SSE连接正常完成");
cleanupConnection(userId, token);
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
cleanupConnection(userId, token);
});
emitter.onError((e) -> {
log.error("SSE连接错误", e);
cleanupConnection(userId, token);
});
3. 安全认证集成
集成Sa-Token确保连接安全性:
// 在控制器中验证用户身份
@SaCheckLogin
@GetMapping(value = "${sse.path}")
public SseEmitter connect(@RequestHeader("userId") Long userId,
@RequestHeader("token") String tokenValue) {
// 验证token有效性
if (!StpUtil.getTokenValue().equals(tokenValue)) {
throw new RuntimeException("Token验证失败");
}
return sseEmitterManager.connect(userId, tokenValue);
}
性能优化策略
1. 连接池优化
// 使用ConcurrentHashMap管理连接,避免锁竞争
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
// 定期清理无效连接
@Scheduled(fixedRate = 300000) // 每5分钟清理一次
public void cleanupInactiveConnections() {
USER_TOKEN_EMITTERS.entrySet().removeIf(entry ->
entry.getValue().isEmpty());
}
2. 消息压缩与批处理
对于大量小消息,建议进行批处理:
public void sendBatchMessages(Long userId, List<String> messages) {
String batchMessage = String.join("\n", messages);
sseEmitterManager.sendMessage(userId, batchMessage);
}
3. 监控与告警
集成Prometheus监控SSE连接状态:
@Bean
public MeterBinder sseMetrics(SseEmitterManager sseEmitterManager) {
return registry -> {
Gauge.builder("sse.active.connections",
() -> sseEmitterManager.getActiveConnectionCount())
.description("当前活跃的SSE连接数")
.register(registry);
};
}
常见问题与解决方案
1. 连接数限制问题
问题:浏览器对同一域名的SSE连接数有限制(通常6个) 解决方案:使用不同的子域名或HTTP/2多路复用
2. 防火墙与代理问题
问题:某些网络环境可能阻止长连接 解决方案:配置合适的超时时间,实现自动重连机制
3. 消息顺序保证
问题:集群环境下消息可能乱序到达 解决方案:为消息添加序列号,客户端进行排序处理
public class SseMessageDto implements Serializable {
private Long sequenceId; // 消息序列号
private List<Long> userIds;
private String message;
}
实战应用场景
1. 实时通知系统
// 消息通知服务
@Service
public class NotificationService {
@Autowired
private SseEmitterManager sseEmitterManager;
public void sendNotification(Long userId, String title, String content) {
Map<String, Object> message = Map.of(
"type", "notification",
"title", title,
"content", content,
"timestamp", System.currentTimeMillis()
);
sseEmitterManager.sendMessage(userId, JSON.toJSONString(message));
}
}
2. 实时数据监控
// 监控数据推送
@Scheduled(fixedRate = 1000)
public void pushMonitoringData() {
MonitoringData data = monitoringService.getRealTimeData();
sseEmitterManager.publishAll(JSON.toJSONString(data));
}
3. 在线协作功能
// 协同编辑通知
public void notifyCollaborators(Long documentId, String operation) {
List<Long> collaborators = collaborationService.getCollaborators(documentId);
SseMessageDto message = new SseMessageDto();
message.setUserIds(collaborators);
message.setMessage(JSON.toJSONString(Map.of(
"type", "collaboration",
"documentId", documentId,
"operation", operation
)));
sseEmitterManager.publishMessage(message);
}
总结与展望
RuoYi-Cloud-Plus 的 SSE 推送方案为企业级实时消息通信提供了完整、可靠的解决方案。通过精心设计的架构和丰富的功能特性,该系统能够满足各种复杂的实时通信需求。
核心价值总结:
- 🚀 高性能:基于内存映射和Redis Pub/Sub的高效消息分发
- 🔒 高安全:集成Sa-Token认证,确保连接安全性
- 📈 易扩展:集群化设计,支持水平扩展
- 🛠️ 易集成:简洁的API设计,快速接入现有系统
- 📊 可观测:完善的监控和日志体系
未来发展方向:
- 支持WebSocket双协议切换
- 消息持久化与可靠性保证
- 移动端SDK集成
- 流量控制与限流保护
- 消息加密传输支持
通过本文的详细解析,相信您已经对 RuoYi-Cloud-Plus 的 SSE 实时消息方案有了深入的理解。无论是构建实时通知系统、在线协作平台还是实时数据监控,这套方案都能为您提供强大的技术支撑。
登录后查看全文
热门项目推荐
相关项目推荐
Kimi-K2.5Kimi K2.5 是一款开源的原生多模态智能体模型,它在 Kimi-K2-Base 的基础上,通过对约 15 万亿混合视觉和文本 tokens 进行持续预训练构建而成。该模型将视觉与语言理解、高级智能体能力、即时模式与思考模式,以及对话式与智能体范式无缝融合。Python00
GLM-4.7-FlashGLM-4.7-Flash 是一款 30B-A3B MoE 模型。作为 30B 级别中的佼佼者,GLM-4.7-Flash 为追求性能与效率平衡的轻量化部署提供了全新选择。Jinja00
VLOOKVLOOK™ 是优雅好用的 Typora/Markdown 主题包和增强插件。 VLOOK™ is an elegant and practical THEME PACKAGE × ENHANCEMENT PLUGIN for Typora/Markdown.Less00
PaddleOCR-VL-1.5PaddleOCR-VL-1.5 是 PaddleOCR-VL 的新一代进阶模型,在 OmniDocBench v1.5 上实现了 94.5% 的全新 state-of-the-art 准确率。 为了严格评估模型在真实物理畸变下的鲁棒性——包括扫描伪影、倾斜、扭曲、屏幕拍摄和光照变化——我们提出了 Real5-OmniDocBench 基准测试集。实验结果表明,该增强模型在新构建的基准测试集上达到了 SOTA 性能。此外,我们通过整合印章识别和文本检测识别(text spotting)任务扩展了模型的能力,同时保持 0.9B 的超紧凑 VLM 规模,具备高效率特性。Python00
KuiklyUI基于KMP技术的高性能、全平台开发框架,具备统一代码库、极致易用性和动态灵活性。 Provide a high-performance, full-platform development framework with unified codebase, ultimate ease of use, and dynamic flexibility. 注意:本仓库为Github仓库镜像,PR或Issue请移步至Github发起,感谢支持!Kotlin07
compass-metrics-modelMetrics model project for the OSS CompassPython00
项目优选
收起
deepin linux kernel
C
27
11
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
525
3.72 K
Ascend Extension for PyTorch
Python
329
391
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
877
578
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
335
162
暂无简介
Dart
764
189
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
12
1
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
1.33 K
746
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
67
20
React Native鸿蒙化仓库
JavaScript
302
350