首页
/ yudao-cloud WebSocket:实时消息推送与在线聊天功能

yudao-cloud WebSocket:实时消息推送与在线聊天功能

2026-02-04 04:36:09作者:何将鹤

引言:实时通信的迫切需求

在现代Web应用中,实时消息推送和在线聊天功能已成为提升用户体验的关键要素。无论是系统通知、订单状态更新,还是客服聊天、团队协作,都需要WebSocket技术来实现真正的双向实时通信。yudao-cloud基于Spring Boot提供了强大的WebSocket解决方案,让开发者能够轻松构建高性能的实时应用。

WebSocket核心架构设计

yudao-cloud的WebSocket模块采用分层架构设计,确保系统的可扩展性和维护性:

classDiagram
    class WebSocketMessageSender {
        +send(Integer userType, Long userId, String messageType, String messageContent)
        +send(Integer userType, String messageType, String messageContent)
        +send(String sessionId, String messageType, String messageContent)
    }
    
    class WebSocketMessageListener~T~ {
        +onMessage(WebSocketSession session, T message)
        +getType() String
    }
    
    class WebSocketSessionManager {
        +addSession(WebSocketSession session)
        +removeSession(String sessionId)
        +getSession(String sessionId) WebSocketSession
    }
    
    class LoginUserHandshakeInterceptor {
        +beforeHandshake(...) boolean
    }
    
    WebSocketMessageSender <|.. LocalWebSocketMessageSender
    WebSocketMessageSender <|.. RedisWebSocketMessageSender
    WebSocketMessageSender <|.. KafkaWebSocketMessageSender
    WebSocketMessageListener <|.. DemoWebSocketMessageListener

核心组件功能说明

组件名称 职责描述 关键技术
WebSocketMessageSender 消息发送器接口,支持多种消息发送模式 策略模式、JSON序列化
WebSocketMessageListener 消息监听器,处理接收到的WebSocket消息 泛型、事件驱动
WebSocketSessionManager 会话管理,维护在线用户连接 ConcurrentHashMap、Session管理
LoginUserHandshakeInterceptor 握手拦截器,实现用户认证 Spring Security集成

快速开始:构建实时聊天功能

1. 环境配置

首先在application.yml中启用WebSocket功能:

yudao:
  websocket:
    enable: true
    server:
      port: 9321 # WebSocket服务器端口

2. 定义消息模型

创建发送和接收的消息DTO:

// 发送消息DTO
@Data
@Schema(description = "WebSocket 示例发送消息")
public class DemoSendMessage {
    
    @Schema(description = "接收用户编号", example = "1024")
    private Long toUserId;
    
    @Schema(description = "消息内容", example = "你好,这是一条测试消息")
    private String text;
}

// 接收消息DTO  
@Data
@Schema(description = "WebSocket 示例接收消息")
public class DemoReceiveMessage {
    
    @Schema(description = "发送用户编号", example = "1")
    private Long fromUserId;
    
    @Schema(description = "消息内容", example = "你好,这是一条测试消息")
    private String text;
    
    @Schema(description = "是否单发", example = "true")
    private Boolean single;
}

3. 实现消息监听器

@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {

    @Autowired
    private WebSocketMessageSender webSocketMessageSender;

    @Override
    public void onMessage(WebSocketSession session, DemoSendMessage message) {
        Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
        
        // 单发消息处理
        if (message.getToUserId() != null) {
            DemoReceiveMessage toMessage = new DemoReceiveMessage()
                .setFromUserId(fromUserId)
                .setText(message.getText())
                .setSingle(true);
            
            webSocketMessageSender.sendObject(
                UserTypeEnum.ADMIN.getValue(), 
                message.getToUserId(),
                "demo-message-receive", 
                toMessage
            );
            return;
        }
        
        // 群发消息处理
        DemoReceiveMessage toMessage = new DemoReceiveMessage()
            .setFromUserId(fromUserId)
            .setText(message.getText())
            .setSingle(false);
            
        webSocketMessageSender.sendObject(
            UserTypeEnum.ADMIN.getValue(),
            "demo-message-receive", 
            toMessage
        );
    }

    @Override
    public String getType() {
        return "demo-message-send";
    }
}

4. 前端WebSocket连接

// 建立WebSocket连接
const socket = new WebSocket('ws://localhost:9321/websocket');

// 连接建立事件
socket.onopen = function(event) {
    console.log('WebSocket连接已建立');
    
    // 发送认证消息(携带token)
    const authMessage = {
        type: 'auth',
        token: '用户登录token'
    };
    socket.send(JSON.stringify(authMessage));
};

// 接收消息事件
socket.onmessage = function(event) {
    const message = JSON.parse(event.data);
    
    switch(message.type) {
        case 'demo-message-receive':
            handleChatMessage(message);
            break;
        case 'system-notification':
            handleSystemNotification(message);
            break;
        default:
            console.log('收到未知类型消息:', message);
    }
};

// 发送聊天消息
function sendChatMessage(toUserId, content) {
    const message = {
        type: 'demo-message-send',
        toUserId: toUserId,
        text: content
    };
    socket.send(JSON.stringify(message));
}

高级特性:消息分发模式

yudao-cloud支持多种消息分发模式,满足不同业务场景需求:

1. 单播消息(Unicast)

// 发送给指定用户
webSocketMessageSender.sendObject(
    UserTypeEnum.ADMIN.getValue(), 
    1024L, // 目标用户ID
    "private-message", 
    messageContent
);

2. 广播消息(Broadcast)

// 发送给所有管理员用户
webSocketMessageSender.sendObject(
    UserTypeEnum.ADMIN.getValue(),
    "system-notification", 
    notificationContent
);

3. 会话级消息(Session-level)

// 发送给特定会话
webSocketMessageSender.sendObject(
    "session-id-123", 
    "session-message", 
    sessionSpecificContent
);

消息可靠性保障

心跳检测机制

sequenceDiagram
    participant Client
    participant Server
    participant SessionManager
    
    Client->>Server: WebSocket连接
    Server->>SessionManager: 注册会话
    Note over Client,Server: 连接建立成功
    
    loop 每30秒一次心跳
        Client->>Server: Ping消息
        Server->>Client: Pong响应
    end
    
    Note over Client,Server: 网络异常断开
    Server->>SessionManager: 检测到连接断开
    SessionManager->>SessionManager: 清理会话资源

重连机制实现

class WebSocketManager {
    constructor() {
        this.socket = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
    }
    
    connect() {
        this.socket = new WebSocket('ws://localhost:9321/websocket');
        
        this.socket.onclose = (event) => {
            if (this.reconnectAttempts < this.maxReconnectAttempts) {
                setTimeout(() => {
                    this.reconnectAttempts++;
                    this.connect();
                }, 3000 * this.reconnectAttempts);
            }
        };
        
        this.socket.onopen = (event) => {
            this.reconnectAttempts = 0;
            this.authenticate();
        };
    }
}

性能优化策略

1. 连接池管理

@Configuration
public class WebSocketConfig {
    
    @Bean
    public WebSocketSessionManager webSocketSessionManager() {
        return new WebSocketSessionManagerImpl(10000); // 最大连接数限制
    }
}

2. 消息压缩

public void sendCompressedMessage(Integer userType, Long userId, String messageType, Object message) {
    String json = JsonUtils.toJsonString(message);
    byte[] compressed = compress(json.getBytes(StandardCharsets.UTF_8));
    // 发送压缩后的消息
}

3. 批量消息处理

@Scheduled(fixedRate = 1000) // 每秒批量处理一次
public void processBatchMessages() {
    List<WebSocketMessage> batch = messageQueue.drainToBatch();
    if (!batch.isEmpty()) {
        sendBatchMessages(batch);
    }
}

安全防护措施

1. 认证授权

@Component
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
    
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, 
                                 ServerHttpResponse response,
                                 WebSocketHandler wsHandler, 
                                 Map<String, Object> attributes) {
        // 从请求中提取token并进行验证
        String token = extractToken(request);
        if (!authService.validateToken(token)) {
            return false; // 认证失败,拒绝连接
        }
        
        // 设置用户信息到attributes中
        LoginUser loginUser = authService.getLoginUser(token);
        attributes.put("loginUser", loginUser);
        return true;
    }
}

2. 消息内容过滤

public class MessageContentFilter {
    
    private static final Pattern XSS_PATTERN = Pattern.compile("[<>\"']");
    
    public static String filter(String content) {
        if (content == null) return null;
        return XSS_PATTERN.matcher(content).replaceAll("");
    }
}

实战案例:系统通知功能

后端服务实现

@Service
public class NotificationService {
    
    @Autowired
    private WebSocketMessageSender webSocketMessageSender;
    
    public void sendSystemNotification(String title, String content, UserTypeEnum userType) {
        SystemNotification notification = new SystemNotification()
            .setTitle(title)
            .setContent(content)
            .setCreateTime(new Date());
            
        webSocketMessageSender.sendObject(
            userType.getValue(),
            "system-notification",
            notification
        );
    }
    
    public void sendPersonalNotification(Long userId, String title, String content) {
        PersonalNotification notification = new PersonalNotification()
            .setTitle(title)
            .setContent(content)
            .setUserId(userId)
            .setCreateTime(new Date());
            
        webSocketMessageSender.sendObject(
            UserTypeEnum.ADMIN.getValue(),
            userId,
            "personal-notification", 
            notification
        );
    }
}

前端消息处理

// 系统通知处理
function handleSystemNotification(notification) {
    const notificationElement = `
        <div class="notification">
            <h4>${escapeHtml(notification.title)}</h4>
            <p>${escapeHtml(notification.content)}</p>
            <span class="time">${formatTime(notification.createTime)}</span>
        </div>
    `;
    
    // 添加到通知列表
    $('#notification-list').prepend(notificationElement);
    
    // 显示桌面通知(如果浏览器支持)
    if ('Notification' in window && Notification.permission === 'granted') {
        new Notification(notification.title, {
            body: notification.content,
            icon: '/static/images/logo.png'
        });
    }
}

监控与调试

1. 连接状态监控

@RestController
@RequestMapping("/admin/websocket")
public class WebSocketMonitorController {
    
    @Autowired
    private WebSocketSessionManager sessionManager;
    
    @GetMapping("/stats")
    public CommonResult<WebSocketStatsVO> getStats() {
        WebSocketStatsVO stats = new WebSocketStatsVO();
        stats.setActiveConnections(sessionManager.getSessionCount());
        stats.setTotalMessagesSent(messageCounter.getTotalSent());
        stats.setTotalMessagesReceived(messageCounter.getTotalReceived());
        return CommonResult.success(stats);
    }
}

2. 日志调试

application.yml中配置WebSocket调试日志:

logging:
  level:
    cn.iocoder.yudao.framework.websocket: DEBUG
    org.springframework.web.socket: DEBUG

常见问题解决方案

1. 连接数限制问题

问题描述:WebSocket连接数达到上限 解决方案

yudao:
  websocket:
    server:
      max-sessions: 50000 # 调整最大连接数

2. 内存泄漏问题

问题描述:Session未正确清理导致内存泄漏 解决方案

@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
    sessionManager.removeSession(event.getSessionId());
    // 清理相关资源
}

3. 跨域问题

解决方案

@Bean
public WebSocketHandlerRegistrationCustomizer webSocketHandlerRegistrationCustomizer() {
    return registration -> registration.setAllowedOrigins("*");
}

总结

yudao-cloud的WebSocket模块提供了完整的企业级实时通信解决方案,具备以下核心优势:

  1. 架构清晰:采用分层设计,各组件职责明确
  2. 功能丰富:支持单播、广播、会话级多种消息模式
  3. 性能优异:内置连接池、消息压缩、批量处理等优化策略
  4. 安全可靠:完善的认证授权和内容过滤机制
  5. 易于扩展:支持多种消息中间件集成

通过本文的详细讲解和代码示例,开发者可以快速掌握yudao-cloud WebSocket的使用方法,构建出高性能、高可用的实时消息推送和在线聊天功能。无论是系统通知、实时监控,还是在线客服、协同编辑,yudao-cloud都能提供强有力的技术支撑。

在实际项目中,建议根据具体业务需求选择合适的消息分发模式,并结合监控系统对WebSocket连接状态进行实时监控,确保系统的稳定运行。

登录后查看全文
热门项目推荐
相关项目推荐