首页
/ Semantic Kernel .NET 中的工具调用状态流式通知机制

Semantic Kernel .NET 中的工具调用状态流式通知机制

2025-05-08 03:16:08作者:瞿蔚英Wynne

在基于 Semantic Kernel .NET 开发 AI 应用时,工具调用(Tool Call)是一个非常重要的功能。然而,当前版本中存在一个明显的体验缺陷:当 AI 模型决定调用外部工具时,开发者无法通过流式响应(StreamingChatMessageContent)获取工具调用的状态信息。

问题背景

在流式聊天场景中,开发者通常使用 GetStreamingChatMessageContentsAsync 方法来获取 AI 模型的响应。当 AI 决定调用外部工具(如插件函数)时,系统会在后台执行这些工具调用,但当前的流式响应中并不包含这些工具调用的状态信息(如开始调用、调用成功、调用失败等)。

这导致终端用户无法感知到:

  1. AI 何时决定调用工具
  2. 调用了哪些工具
  3. 工具调用是否成功完成
  4. 工具调用的执行时间

现有解决方案的局限性

虽然 Semantic Kernel 提供了 IAutoFunctionInvocationFilter 接口来拦截工具调用事件,但这个机制存在几个关键限制:

  1. 过滤器无法直接访问流式响应通道,无法将工具调用状态推送给客户端
  2. 过滤器是单例的,难以与特定请求上下文关联
  3. 缺乏工具调用结果的错误信息捕获机制

创新性的临时解决方案

通过深入分析,我们可以实现一个结合 Channel 和过滤器的混合方案:

  1. 状态通知过滤器:创建一个自定义的 NotifyStreamingFunctionStateFilter,继承 IAutoFunctionInvocationFilter
  2. 请求关联机制:使用请求 ID 将过滤器与特定请求关联
  3. 状态通道:为每个请求创建独立的 Channel 来传递工具调用状态
  4. 流合并:将原始聊天流和状态通知流合并为一个统一的响应流

这个方案的核心在于:

  • 使用 ConcurrentDictionary 维护请求与 Channel 的映射
  • 实现定时清理机制防止内存泄漏
  • 通过元数据传递工具调用详情(插件名、函数名、状态等)

实现细节

状态通知过滤器

public class NotifyStreamingFunctionStateFilter : IAutoFunctionInvocationFilter
{
    private static readonly ConcurrentDictionary<string, (Channel<StreamingChatMessageContent> Channel, DateTime LastUsed)> _channels = new();
    private static readonly TimeSpan ExpirationTime = TimeSpan.FromMinutes(5);
    private static Timer? CleanupTimer;

    // 初始化清理定时器
    public NotifyStreamingFunctionStateFilter()
    {
        CleanupTimer = new Timer(_ => CleanupExpiredEntries(), null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
    }

    // 工具调用拦截逻辑
    public async Task OnAutoFunctionInvocationAsync(AutoFunctionInvocationContext context, Func<AutoFunctionInvocationContext, Task> next)
    {
        if (context.IsStreaming)
        {
            // 获取请求ID
            context.ChatHistory.First()?.Metadata?.TryGetValue("requestId", out var requestId);
            
            if (requestId is not null && _channels.TryGetValue(requestId.ToString(), out var entry))
            {
                // 发送工具调用开始通知
                await SendNotification(entry.Channel, "invoked", context);
                
                await next(context);
                
                // 发送工具调用完成通知
                await SendNotification(entry.Channel, "completed", context);
            }
        }
        else
        {
            await next(context);
        }
    }

    private async Task SendNotification(Channel<StreamingChatMessageContent> channel, string state, AutoFunctionInvocationContext context)
    {
        await channel.Writer.WriteAsync(new StreamingChatMessageContent(
            role: null, 
            content: null, 
            innerContent: null, 
            metadata: new Dictionary<string, object?> {
                {"functionState", state},
                {"pluginName", context.Function.PluginName},
                {"functionName", context.Function.Name}
            }));
    }

    // 注册/注销Channel的方法
    public static Channel<StreamingChatMessageContent> RegisterChannel(string id) { /*...*/ }
    public static void RemoveChannel(string chatHash) { /*...*/ }
    private static void CleanupExpiredEntries() { /*...*/ }
}

流合并工具

public enum MergeCompletionMode { All, Any }

public static async IAsyncEnumerable<T> MergeAsync<T>(
    IAsyncEnumerable<T> first,
    IAsyncEnumerable<T> second,
    MergeCompletionMode mode = MergeCompletionMode.All,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var channel = Channel.CreateUnbounded<T>();
    
    using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    var linkedToken = linkedCts.Token;

    async Task ReadAllAsync(IAsyncEnumerable<T> source)
    {
        try
        {
            await foreach (var item in source.WithCancellation(linkedToken))
            {
                await channel.Writer.WriteAsync(item, cancellationToken);
            }
        }
        catch (OperationCanceledException) when (linkedToken.IsCancellationRequested) { }
        catch (Exception ex)
        {
            channel.Writer.TryComplete(ex);
            return;
        }
    }

    var task1 = ReadAllAsync(first);
    var task2 = ReadAllAsync(second);

    if (mode == MergeCompletionMode.Any)
    {
        _ = Task.WhenAny(task1, task2).ContinueWith(_ => channel.Writer.Complete(), cancellationToken);
    }
    else
    {
        _ = Task.WhenAll(task1, task2).ContinueWith(_ => channel.Writer.Complete(), cancellationToken);
    }

    await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
    {
        yield return item;
    }
}

使用示例

// 1. 为请求分配唯一ID
var requestId = Guid.NewGuid().ToString();
if (chatHistory.First() is not null)
{
    var metadata = new Dictionary<string, object?>(chatHistory.First().Metadata ?? new());
    metadata["requestId"] = requestId;
    chatHistory.First().Metadata = metadata;
}

// 2. 注册状态通知通道
Channel<StreamingChatMessageContent> notificationChannel = 
    NotifyStreamingFunctionStateFilter.RegisterChannel(requestId);

// 3. 获取原始聊天流
var stream = _chatCompletionService.GetStreamingChatMessageContentsAsync(
    chatHistory,
    kernel: _serviceProvider.GetRequiredService<Kernel>(),
    executionSettings: semanticLayerExecutionSettiongs
);

// 4. 合并两个流
var combinedStream = MergeAsync(stream, notificationChannel.Reader.ReadAllAsync(), MergeCompletionMode.Any);

// 5. 处理合并后的流
await foreach (var res in combinedStream)
{
    // 这里会收到两种消息:
    // - 常规聊天内容
    // - 工具调用状态通知(通过Metadata中的functionState等字段识别)
    Console.WriteLine($"内容: {res?.Content}, 元数据: {JsonSerializer.Serialize(res?.Metadata)}");
}

// 6. 清理资源
NotifyStreamingFunctionStateFilter.RemoveChannel(requestId);

方案评估

这个临时解决方案虽然解决了核心问题,但仍存在一些不足:

  1. 实现复杂度高:需要维护额外的Channel和合并逻辑
  2. 错误处理不完善:难以捕获和传递工具调用的错误详情
  3. 性能开销:额外的Channel和字典操作带来一定性能损耗

未来改进建议

理想的解决方案应该由Semantic Kernel原生支持,建议:

  1. 在StreamingChatMessageContent中增加工具调用状态字段
  2. 提供标准化的工具调用生命周期事件模型
  3. 支持工具调用结果的错误信息传递
  4. 允许多个工具调用的并行状态跟踪

这种改进将大大简化开发者实现工具调用状态通知的复杂度,同时提供更完整的工具调用生命周期管理能力。

总结

本文探讨了在Semantic Kernel .NET中实现工具调用状态流式通知的挑战和创新解决方案。通过结合过滤器和Channel技术,我们成功实现了工具调用状态的实时通知机制。虽然这是一个临时方案,但它为未来框架的改进提供了有价值的参考方向。对于需要向终端用户展示AI工作过程的开发者,这个方案提供了一种可行的实现路径。

登录后查看全文
热门项目推荐
相关项目推荐

项目优选

收起
openHiTLS-examplesopenHiTLS-examples
本仓将为广大高校开发者提供开源实践和创新开发平台,收集和展示openHiTLS示例代码及创新应用,欢迎大家投稿,让全世界看到您的精巧密码实现设计,也让更多人通过您的优秀成果,理解、喜爱上密码技术。
C
52
461
kernelkernel
deepin linux kernel
C
22
5
openHiTLSopenHiTLS
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
349
381
nop-entropynop-entropy
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
7
0
openGauss-serveropenGauss-server
openGauss kernel ~ openGauss is an open source relational database management system
C++
131
185
RuoYi-Vue3RuoYi-Vue3
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
873
517
Cangjie-ExamplesCangjie-Examples
本仓将收集和展示高质量的仓颉示例代码,欢迎大家投稿,让全世界看到您的妙趣设计,也让更多人通过您的编码理解和喜爱仓颉语言。
Cangjie
336
1.09 K
ohos_react_nativeohos_react_native
React Native鸿蒙化仓库
C++
179
264
cherry-studiocherry-studio
🍒 Cherry Studio 是一款支持多个 LLM 提供商的桌面客户端
TypeScript
607
59
note-gennote-gen
一款跨平台的 Markdown AI 笔记软件,致力于使用 AI 建立记录和写作的桥梁。
TSX
83
4