QwenLM/Qwen3项目中实现流式响应的RetrievalQA链技术解析
2025-05-11 20:37:20作者:尤辰城Agatha
概述
在QwenLM/Qwen3项目中,开发者经常需要构建基于大语言模型的问答系统,其中RetrievalQA链是一个核心组件。本文将深入探讨如何在该项目中实现带有流式响应功能的RetrievalQA链,特别关注如何集成CallbackManagerForLLMRun来实现高效的响应流处理。
核心组件分析
自定义Qwen语言模型类
项目中首先定义了一个自定义的Qwen语言模型类,继承自langchain的LLM基类。这个类封装了Qwen2-7B-Instruct模型的核心功能:
- 模型初始化:使用AutoModelForCausalLM和AutoTokenizer加载预训练模型
- 参数配置:包括max_token、temperature、top_p等生成参数
- 历史对话管理:通过history_len控制对话历史长度
流式响应实现关键
实现流式响应的核心在于TextIteratorStreamer的使用,这是transformers库提供的流式输出工具。关键技术点包括:
- 流式生成器初始化:在_call方法中创建TextIteratorStreamer实例
- 异步生成线程:使用独立线程运行模型生成过程,避免阻塞主线程
- 响应流处理:通过迭代streamer对象逐步获取生成结果
实现方案演进
初始方案中,开发者直接使用模型的generate方法一次性获取完整响应。改进后的方案引入了以下优化:
- 流式处理架构:将生成过程与响应输出解耦
- 线程安全设计:使用Python的Thread类管理生成过程
- 回调管理:预留了run_manager参数用于集成CallbackManagerForLLMRun
完整实现代码解析
class MainPipeline(LLM):
# 初始化流式生成器和模型参数
streamer: Optional[TextIteratorStreamer] = None
max_token: int = 10000
temperature: float = 0.01
top_p = 0.95
def _call(self, prompt, stop=None, run_manager=None) -> str:
# 获取模型和分词器
model, tokenizer = PipelineManager.get_pipeline(settings.MODEL_ID)
# 初始化流式生成器
self.streamer = TextIteratorStreamer(tokenizer,
skip_prompt=True,
skip_special_tokens=True)
# 准备模型输入
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(messages, tokenize=False)
model_inputs = tokenizer([text], return_tensors="pt").to("cuda")
# 在独立线程中运行生成过程
def generate(**kwargs):
with torch.no_grad():
model.generate(**kwargs)
thread = Thread(target=generate, kwargs={
"input_ids": model_inputs["input_ids"],
"streamer": self.streamer,
"max_new_tokens": self.max_token,
"do_sample": True,
"top_p": self.top_p,
"temperature": self.temperature,
})
thread.start()
return "" # 实际响应通过streamer获取
RetrievalQA链集成
将自定义流式模型集成到RetrievalQA链中需要注意:
- 链初始化:使用from_chain_type方法创建QA链
- 检索器配置:基于FAISS向量库构建文档检索系统
- 提示模板设计:动态生成适合不同场景的提示词
def initialise_model(self):
# 准备提示模板
formatted_prompt = self.prompting_based_on_type()
# 创建RetrievalQA链
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.get_create_vector_data(),
verbose=True,
chain_type_kwargs={"prompt": formatted_prompt}
)
return qa_chain, self.llm
流式响应处理
在实际使用时,通过迭代streamer对象逐步获取响应:
qa_chain, llm = self.initialise_model()
result = qa_chain.run(query=question)
# 流式处理响应
result = ""
if llm.streamer is not None:
while True:
try:
new_text = next(llm.streamer)
print(new_text, end="", flush=True)
result += new_text
except StopIteration:
break
await asyncio.sleep(0) # 允许其他协程运行
性能优化建议
- 批处理优化:对于多个并发请求,考虑使用模型批处理
- 缓存机制:对常见问题实现响应缓存
- 资源管理:合理控制max_token和history_len平衡效果与资源消耗
- 错误处理:增强流式处理过程中的异常捕获和恢复机制
总结
在QwenLM/Qwen3项目中实现流式响应的RetrievalQA链,关键在于正确集成transformers的流式生成功能与langchain的链式处理架构。本文介绍的方法不仅保持了问答系统的检索增强特性,还通过流式输出显著提升了用户体验。开发者可以根据实际需求调整生成参数和流式处理逻辑,构建更高效的问答系统。
登录后查看全文
热门项目推荐
GLM-5智谱 AI 正式发布 GLM-5,旨在应对复杂系统工程和长时域智能体任务。Jinja00
GLM-5-w4a8GLM-5-w4a8基于混合专家架构,专为复杂系统工程与长周期智能体任务设计。支持单/多节点部署,适配Atlas 800T A3,采用w4a8量化技术,结合vLLM推理优化,高效平衡性能与精度,助力智能应用开发Jinja00
jiuwenclawJiuwenClaw 是一款基于openJiuwen开发的智能AI Agent,它能够将大语言模型的强大能力,通过你日常使用的各类通讯应用,直接延伸至你的指尖。Python0208- QQwen3.5-397B-A17BQwen3.5 实现了重大飞跃,整合了多模态学习、架构效率、强化学习规模以及全球可访问性等方面的突破性进展,旨在为开发者和企业赋予前所未有的能力与效率。Jinja00
AtomGit城市坐标计划AtomGit 城市坐标计划开启!让开源有坐标,让城市有星火。致力于与城市合伙人共同构建并长期运营一个健康、活跃的本地开发者生态。01
MarkFlowy一款 AI Markdown 编辑器TSX01
热门内容推荐
项目优选
收起
deepin linux kernel
C
27
12
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
612
4.07 K
Ascend Extension for PyTorch
Python
453
538
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
924
778
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
374
254
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
69
21
暂无简介
Dart
857
205
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
1.47 K
834
React Native鸿蒙化仓库
JavaScript
322
377
AscendNPU-IR是基于MLIR(Multi-Level Intermediate Representation)构建的,面向昇腾亲和算子编译时使用的中间表示,提供昇腾完备表达能力,通过编译优化提升昇腾AI处理器计算效率,支持通过生态框架使能昇腾AI处理器与深度调优
C++
114
177