首页
/ InterestingLab/Waterdrop项目中Postgres-CDC数据重复问题解析与解决方案

InterestingLab/Waterdrop项目中Postgres-CDC数据重复问题解析与解决方案

2025-05-27 17:11:21作者:蔡丛锟

背景介绍

在使用InterestingLab/Waterdrop项目进行数据同步时,当配置Postgres-CDC作为数据源,同时使用RabbitMQ和Console作为数据接收端时,出现了数据重复的问题。这个问题特别出现在处理CDC(变更数据捕获)源中的更新记录时,值得深入分析其产生原因和解决方案。

问题现象

当Postgres数据库中的表发生更新操作时,Waterdrop会捕获到两条记录:

  1. UPDATE_BEFORE - 表示更新前的数据状态
  2. UPDATE_AFTER - 表示更新后的数据状态

在数据接收端(RabbitMQ和Console)会同时收到这两条记录,导致数据重复。而在JDBC接收端则不会出现此问题,因为JDBC连接器内部已经实现了对UPDATE_BEFORE记录的过滤逻辑。

技术原理分析

Postgres-CDC的工作原理是基于PostgreSQL的逻辑解码功能,它会捕获数据库中的所有数据变更事件,包括:

  • 插入(INSERT)
  • 更新(UPDATE)
  • 删除(DELETE)

对于UPDATE操作,CDC会生成两条记录:

  1. 更新前的数据快照(UPDATE_BEFORE)
  2. 更新后的数据快照(UPDATE_AFTER)

这种设计是为了完整记录数据变更的历史,但在某些业务场景下,我们可能只需要关注变更后的数据状态。

解决方案

方案一:使用FilterRowKind转换器

Waterdrop提供了FilterRowKind转换器,可以过滤掉不需要的变更类型记录。配置示例如下:

transform {
  FilterRowKind {
    source_table_name = "employees"
    result_table_name = "employees_filtered"
    exclude_kinds = ["UPDATE_BEFORE"]
  }
}

这种方案的优势是:

  1. 配置简单直观
  2. 可以灵活选择需要保留的变更类型
  3. 不影响其他接收端的处理逻辑

方案二:在接收端实现过滤逻辑

类似JDBC接收端的做法,可以在自定义接收器中实现对特定变更类型的过滤。这种方法需要一定的开发工作,但可以实现更精细的控制。

最佳实践建议

  1. 根据业务需求明确需要处理的变更类型
  2. 对于只需要最终状态的场景,建议过滤掉UPDATE_BEFORE记录
  3. 对于需要完整变更历史的场景,可以考虑将记录类型作为元数据一并存储
  4. 在测试环境中验证过滤逻辑是否符合预期

总结

Postgres-CDC产生的数据重复问题本质上是由于其完整记录变更历史的特性导致的。通过合理使用Waterdrop提供的转换器,可以灵活地控制数据处理流程,满足不同业务场景的需求。理解CDC的工作原理和Waterdrop的转换机制,有助于构建更健壮的数据管道。

登录后查看全文
热门项目推荐
相关项目推荐

项目优选

收起
kernelkernel
deepin linux kernel
C
22
6
docsdocs
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
197
2.17 K
ohos_react_nativeohos_react_native
React Native鸿蒙化仓库
C++
208
285
pytorchpytorch
Ascend Extension for PyTorch
Python
59
94
RuoYi-Vue3RuoYi-Vue3
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
974
574
nop-entropynop-entropy
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
9
1
ops-mathops-math
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
549
81
openHiTLSopenHiTLS
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
1.02 K
399
communitycommunity
本项目是CANN开源社区的核心管理仓库,包含社区的治理章程、治理组织、通用操作指引及流程规范等基础信息
393
27
MateChatMateChat
前端智能化场景解决方案UI库,轻松构建你的AI应用,我们将持续完善更新,欢迎你的使用与建议。 官网地址:https://matechat.gitcode.com
1.2 K
133