首页
/ Spring AI项目中SimpleLoggerAdvisor重复订阅导致聊天中断问题分析

Spring AI项目中SimpleLoggerAdvisor重复订阅导致聊天中断问题分析

2025-06-11 13:24:13作者:明树来

问题现象描述

在Spring AI项目中使用SimpleLoggerAdvisor时,开发者发现了一个异常现象:每次聊天会话期间,向模型发送的请求数量会异常递增。具体表现为:

  • 第一次聊天发送1个请求
  • 第二次聊天发送2个请求
  • 第三次聊天发送3个请求
  • 依此类推

这种递增现象最终会导致模型因超出请求限制而中断服务。从日志截图可以看到,请求消息被重复记录,且每次聊天的请求数量确实在不断增加。

问题根源分析

通过查看提供的LoggingAdvisor实现代码,可以发现问题出在流式处理(StreamAroundAdvisor)的实现方式上。核心原因在于:

  1. 重复订阅问题:在aroundStream方法中,每次流式请求都会创建一个新的MessageAggregator实例,但没有正确处理Flux的订阅机制。

  2. 副作用操作:before方法中对请求消息的日志记录操作被包含在Flux处理链中,导致每次订阅都会重复执行日志记录。

  3. 响应观察机制:observeAfter方法作为回调传递给MessageAggregator,可能在多个环节被触发。

技术原理深入

Spring AI中的Advisor机制借鉴了Spring AOP的设计思想,用于在AI模型调用前后插入横切逻辑。在流式处理场景下,需要特别注意响应式编程的特性:

  1. 冷热发布者:Flux作为冷发布者,每次订阅都会重新触发整个处理链。

  2. 背压机制:流式处理需要考虑背压控制,避免资源消耗过大。

  3. 副作用隔离:日志记录等副作用操作应该与核心处理逻辑分离。

解决方案建议

针对这个问题,可以采取以下几种解决方案:

方案一:重构LoggingAdvisor实现

@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest advisedRequest, StreamAroundAdvisorChain chain) {
    // 将日志记录移出处理链
    advisedRequest.messages().forEach(message -> {
        logger.info("request: {}", message);
    });
    
    return chain.nextAroundStream(advisedRequest)
            .doOnNext(this::observeAfter);
}

方案二:使用共享的MessageAggregator

private final MessageAggregator messageAggregator = new MessageAggregator();

@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest advisedRequest, StreamAroundAdvisorChain chain) {
    advisedRequest = before(advisedRequest);
    return messageAggregator.aggregateAdvisedResponse(
        chain.nextAroundStream(advisedRequest),
        this::observeAfter
    );
}

方案三:添加订阅控制

private final AtomicBoolean subscribed = new AtomicBoolean(false);

@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest advisedRequest, StreamAroundAdvisorChain chain) {
    if (subscribed.compareAndSet(false, true)) {
        advisedRequest = before(advisedRequest);
        return chain.nextAroundStream(advisedRequest)
                .doOnNext(this::observeAfter)
                .doFinally(signal -> subscribed.set(false));
    }
    return chain.nextAroundStream(advisedRequest);
}

最佳实践建议

  1. 避免在流式处理中创建新实例:MessageAggregator等辅助类应该作为成员变量重用。

  2. 分离副作用操作:日志记录等操作应该与核心处理逻辑分离。

  3. 理解响应式编程模型:深入掌握Project Reactor的订阅机制和背压控制。

  4. 添加适当的防御性编程:对于可能重复执行的操作添加状态检查。

  5. 性能考量:在流式处理中,尽量减少不必要的对象创建和复制操作。

总结

这个问题典型地展示了在响应式编程模型下处理副作用操作时需要特别注意的事项。Spring AI的Advisor机制虽然强大,但也需要开发者深入理解其工作原理才能正确使用。通过重构日志记录逻辑、重用辅助对象以及控制订阅行为,可以有效解决这个重复订阅导致的问题,确保AI聊天服务的稳定运行。

登录后查看全文
热门项目推荐
相关项目推荐

项目优选

收起
ohos_react_nativeohos_react_native
React Native鸿蒙化仓库
C++
176
261
RuoYi-Vue3RuoYi-Vue3
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
860
511
ShopXO开源商城ShopXO开源商城
🔥🔥🔥ShopXO企业级免费开源商城系统,可视化DIY拖拽装修、包含PC、H5、多端小程序(微信+支付宝+百度+头条&抖音+QQ+快手)、APP、多仓库、多商户、多门店、IM客服、进销存,遵循MIT开源协议发布、基于ThinkPHP8框架研发
JavaScript
93
15
openGauss-serveropenGauss-server
openGauss kernel ~ openGauss is an open source relational database management system
C++
129
182
openHiTLSopenHiTLS
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
259
300
kernelkernel
deepin linux kernel
C
22
5
cherry-studiocherry-studio
🍒 Cherry Studio 是一款支持多个 LLM 提供商的桌面客户端
TypeScript
595
57
CangjieCommunityCangjieCommunity
为仓颉编程语言开发者打造活跃、开放、高质量的社区环境
Markdown
1.07 K
0
HarmonyOS-ExamplesHarmonyOS-Examples
本仓将收集和展示仓颉鸿蒙应用示例代码,欢迎大家投稿,在仓颉鸿蒙社区展现你的妙趣设计!
Cangjie
398
371
Cangjie-ExamplesCangjie-Examples
本仓将收集和展示高质量的仓颉示例代码,欢迎大家投稿,让全世界看到您的妙趣设计,也让更多人通过您的编码理解和喜爱仓颉语言。
Cangjie
332
1.08 K