yudaocode/ruoyi-vue-pro:Kafka集成实战
2026-02-04 05:10:47作者:农烁颖Land
引言:为什么选择Kafka作为消息队列?
在现代分布式系统中,消息队列(Message Queue)已成为解耦系统组件、提升系统可靠性和扩展性的关键技术。Apache Kafka作为分布式流处理平台,凭借其高吞吐量、低延迟、持久化存储和水平扩展能力,成为企业级应用的首选消息队列解决方案。
在ruoyi-vue-pro项目中,Kafka被深度集成到多个核心模块中,为系统提供了强大的异步消息处理能力。本文将深入探讨如何在ruoyi-vue-pro项目中实战使用Kafka,涵盖从基础配置到高级特性的完整实现。
一、Kafka在ruoyi-vue-pro中的架构设计
1.1 整体架构概览
graph TB
A[Producer应用] --> B[Kafka集群]
B --> C[Consumer应用]
B --> D[Kafka Streams]
B --> E[Connector]
subgraph ruoyi-vue-pro模块
F[系统模块]
G[WebSocket模块]
H[多租户模块]
end
F --> A
G --> A
H --> C
C --> F
C --> G
1.2 核心组件职责
| 组件 | 职责描述 | 应用场景 |
|---|---|---|
| KafkaTemplate | 消息发送模板 | 统一消息发送接口 |
| @KafkaListener | 消息消费注解 | 声明式消息监听 |
| TenantKafkaProducerInterceptor | 多租户拦截器 | 租户上下文传递 |
| KafkaWebSocketMessageSender | WebSocket消息发送 | 实时消息广播 |
二、环境准备与依赖配置
2.1 Maven依赖配置
在ruoyi-vue-pro项目中,Kafka依赖通过Spring Boot Starter自动管理:
<!-- pom.xml 中的依赖管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
2.2 配置文件示例
# application.yml 配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
consumer:
group-id: ruoyi-vue-pro-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "cn.iocoder.yudao.framework.websocket.core.sender.kafka"
三、核心功能实现详解
3.1 消息生产者实现
3.1.1 基础消息发送
@Service
@RequiredArgsConstructor
public class KafkaMessageService {
private final KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 发送简单消息
*/
public void sendSimpleMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
/**
* 发送带键的消息
*/
public void sendMessageWithKey(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
/**
* 发送对象消息
*/
public void sendObjectMessage(String topic, Object payload) {
kafkaTemplate.send(topic, payload);
}
}
3.1.2 多租户消息发送拦截器
ruoyi-vue-pro实现了多租户场景下的Kafka消息拦截器:
public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers");
headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes());
}
return record;
}
// 其他方法实现...
}
3.2 消息消费者实现
3.2.1 基础消息消费
@Component
@Slf4j
public class KafkaMessageConsumer {
/**
* 监听指定topic的消息
*/
@KafkaListener(
topics = "${spring.kafka.topic.user-notification}",
groupId = "${spring.kafka.consumer.group-id}"
)
public void consumeUserNotification(String message) {
log.info("收到用户通知消息: {}", message);
// 业务处理逻辑
}
/**
* 监听多个topic
*/
@KafkaListener(
topics = {"topic1", "topic2", "topic3"},
groupId = "${spring.kafka.consumer.group-id}"
)
public void consumeMultipleTopics(String message) {
log.info("收到多topic消息: {}", message);
}
}
3.2.2 对象消息消费
@Component
@Slf4j
public class ObjectMessageConsumer {
@KafkaListener(
topics = "${spring.kafka.topic.websocket}",
groupId = "${spring.kafka.consumer.group-id}"
)
public void consumeWebSocketMessage(KafkaWebSocketMessage message) {
log.info("收到WebSocket广播消息: {}", message);
// 处理WebSocket消息逻辑
}
}
3.3 WebSocket与Kafka集成实战
ruoyi-vue-pro通过Kafka实现了分布式WebSocket消息广播:
@Slf4j
public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final String topic;
public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
KafkaTemplate<Object, Object> kafkaTemplate,
String topic) {
super(sessionManager);
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
/**
* 通过Kafka广播消息
*/
private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
.setMessageType(messageType).setMessageContent(messageContent);
try {
kafkaTemplate.send(topic, mqMessage).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[sendKafkaMessage][发送消息({}) 到Kafka失败]", mqMessage, e);
}
}
}
四、高级特性与最佳实践
4.1 消息确认机制
// 发送消息并获取发送结果
public void sendMessageWithCallback(String topic, String message) {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onSuccess(SendResult<Object, Object> result) {
log.info("消息发送成功: {}", result.getRecordMetadata());
}
@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败: {}", ex.getMessage());
}
});
}
4.2 事务消息处理
@Transactional
public void processWithTransaction(String topic, String message) {
// 数据库操作
userRepository.save(new User());
// Kafka消息发送(在同一事务中)
kafkaTemplate.send(topic, message);
// 其他业务操作
}
4.3 消息重试与死信队列
@Configuration
public class KafkaRetryConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 配置重试机制
factory.setRetryTemplate(retryTemplate());
// 配置死信队列
factory.setRecoveryCallback(context -> {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) context.getAttribute("record");
log.error("消息处理失败,发送到死信队列: {}", record);
// 发送到死信队列逻辑
return null;
});
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
return retryTemplate;
}
}
五、性能优化与监控
5.1 生产者性能优化
spring:
kafka:
producer:
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 缓冲区内存
linger-ms: 5 # 发送延迟
compression-type: snappy # 压缩类型
5.2 消费者性能优化
spring:
kafka:
consumer:
max-poll-records: 500 # 每次拉取最大记录数
fetch-max-wait-ms: 500 # 拉取最大等待时间
fetch-min-bytes: 1 # 拉取最小字节数
5.3 监控指标配置
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> kafkaMetrics() {
return registry -> {
registry.config().commonTags("application", "ruoyi-vue-pro");
};
}
}
六、常见问题与解决方案
6.1 消息顺序性问题
// 确保相同key的消息发送到同一分区
public void sendOrderedMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
6.2 消息重复消费问题
@Component
@Slf4j
public class IdempotentConsumer {
private final Set<String> processedMessageIds = ConcurrentHashMap.newKeySet();
@KafkaListener(topics = "order-topic")
public void consumeOrderMessage(OrderMessage message) {
if (processedMessageIds.contains(message.getId())) {
log.info("消息已处理,跳过: {}", message.getId());
return;
}
// 处理消息
processOrder(message);
// 记录已处理消息
processedMessageIds.add(message.getId());
}
}
6.3 消费者偏移量管理
@KafkaListener(topics = "user-topic", groupId = "user-group")
public void consumeUserMessage(
ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
try {
// 处理消息
processUserMessage(record.value());
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("消息处理失败: {}", record.value(), e);
// 不提交偏移量,等待重试
}
}
七、实战案例:订单状态变更通知
7.1 消息生产者实现
@Service
@RequiredArgsConstructor
public class OrderStatusProducer {
private final KafkaTemplate<String, OrderStatusMessage> kafkaTemplate;
public void sendOrderStatusChange(Long orderId, String oldStatus, String newStatus) {
OrderStatusMessage message = new OrderStatusMessage()
.setOrderId(orderId)
.setOldStatus(oldStatus)
.setNewStatus(newStatus)
.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send("order-status-topic", orderId.toString(), message);
}
}
7.2 消息消费者实现
@Component
@Slf4j
public class OrderStatusConsumer {
@KafkaListener(
topics = "order-status-topic",
groupId = "order-status-group"
)
public void consumeOrderStatusChange(OrderStatusMessage message) {
log.info("订单状态变更: 订单ID={}, 从{}变为{}",
message.getOrderId(), message.getOldStatus(), message.getNewStatus());
// 发送通知给相关用户
notifyUsers(message);
// 更新相关系统状态
updateSystemStatus(message);
}
}
总结
通过本文的详细讲解,我们深入了解了ruoyi-vue-pro项目中Kafka的集成实战。从基础配置到高级特性,从消息生产到消费,从性能优化到监控告警,Kafka为ruoyi-vue-pro提供了强大的异步消息处理能力。
关键要点总结:
- 架构设计:采用生产者-消费者模式,支持多租户和WebSocket集成
- 核心组件:KafkaTemplate、@KafkaListener、多租户拦截器
- 高级特性:事务消息、重试机制、死信队列
- 性能优化:批量处理、压缩、监控指标
- 实战案例:订单状态变更通知等业务场景
Kafka在ruoyi-vue-pro中的成功集成,为构建高可用、可扩展的分布式系统提供了坚实的技术基础。通过合理的配置和最佳实践,可以充分发挥Kafka的性能优势,满足各种复杂的业务场景需求。
登录后查看全文
热门项目推荐
相关项目推荐
atomcodeClaude Code 的开源替代方案。连接任意大模型,编辑代码,运行命令,自动验证 — 全自动执行。用 Rust 构建,极致性能。 | An open-source alternative to Claude Code. Connect any LLM, edit code, run commands, and verify changes — autonomously. Built in Rust for speed. Get StartedRust0202
cann-learning-hubCANN 学习中心仓,支持在线互动运行、边学边练,提供教程、示例与优化方案,一站式助力昇腾开发者快速上手。Jupyter Notebook0130
MiMo-V2.5-Pro-FP4-DFlashMiMo-V2.5-Pro-FP4-DFlash 是驱动 MiMo-V2.5-Pro-UltraSpeed 的底层模型: FP4 量化骨干网络:对 MoE 专家采用 MXFP4 量化,同时保持模型其他部分的更高精度,在几乎无损质量的前提下,显著减小模型体积并降低内存带宽压力。 BF16 DFlash 草稿生成器:用于块扩散推测解码,每次前向传播可生成一整个块的 tokens,并让骨干网络一步完成验证。 两者协同作用,既降低了每参数的位宽,又减少了骨干网络前向传播的次数,而这两者正是万亿参数模型解码过程中的两大主要成本来源。Python00
JoyAI-EchoJoyAI-Echo,这是一个独立的、仅用于推理的版本,旨在实现分钟级多镜头音视频生成。它采用了经过蒸馏的DMD生成器、配对的跨模态记忆以及故事级别的一致性。其性能的核心在于,一个跨模态视听记忆库能够在长达五分钟的视频中保持角色外观和语音音色的一致性。同时,一个训练后处理流程将基于记忆的强化学习与分布匹配蒸馏相结合,实现了7.5倍的速度提升,显著增强了视觉质量和对齐效果。00
AstrBot✨ 易上手的多平台 LLM 聊天机器人及开发框架 ✨ 平台支持 QQ、QQ频道、Telegram、微信、企微、飞书 | OpenAI、DeepSeek、Gemini、硅基流动、月之暗面、Ollama、OneAPI、Dify 等。附带 WebUI。Python08
handy-ollama动手学Ollama,CPU玩转大模型部署,在线阅读地址:https://datawhalechina.github.io/handy-ollama/Jupyter Notebook07
项目优选
收起
deepin linux kernel
C
32
16
Ascend Extension for PyTorch
Python
746
927
本仓库是 Flutter SDK 与 Flutter Engine 的 OpenHarmony 适配版本,由 CPF-Flutter 团队维护。开发者可使用熟悉的 Flutter 技术栈开发 OpenHarmony 应用,3.35.7 及以后的适配版本可基于本仓库源码构建支持 OpenHarmony 的 Flutter Engine。
Dart
1.02 K
267
暂无描述
Dockerfile
771
5.03 K
本项目是CANN提供的transformer类大模型算子库,实现网络在NPU上加速计算。
C++
867
1.97 K
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
70
22
Claude Code 的开源替代方案。连接任意大模型,编辑代码,运行命令,自动验证 — 全自动执行。用 Rust 构建,极致性能。 | An open-source alternative to Claude Code. Connect any LLM, edit code, run commands, and verify changes — autonomously. Built in Rust for speed.
Get Started
Rust
1.94 K
202
本项目是CANN提供的神经网络类计算算子库,实现网络在NPU上加速计算。
C++
694
1.36 K
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
465
456
本仓将收集和展示高质量的仓颉示例代码,欢迎大家投稿,让全世界看到您的妙趣设计,也让更多人通过您的编码理解和喜爱仓颉语言。
C
458
5.25 K