AutoGen跨节点智能体协作框架:技术解析与实践指南
一、概念解析:分布式智能体系统的核心架构
1.1 分布式智能体网络的定义与价值
在人工智能应用从单模型向多智能体协作演进的过程中,传统集中式架构面临三大挑战:计算资源瓶颈、系统扩展性受限以及多语言生态整合困难。AutoGen分布式运行时框架通过引入去中心化消息通信机制,构建了一个能够跨节点、跨语言协作的智能体网络,有效解决了这些挑战。
该框架的核心价值在于:
- 资源弹性分配:将计算任务分布到不同节点,实现负载均衡
- 多语言生态融合:支持Python与.NET等多语言智能体无缝协作
- 系统模块化设计:每个智能体可独立开发、部署和升级
- 故障隔离:单个节点故障不会导致整个系统崩溃
1.2 核心组件与技术原理
AutoGen分布式框架包含四个关键组件,它们通过gRPC协议实现高效通信:
GrpcWorkerAgentRuntimeHost
作为中心协调节点,负责管理所有连接和消息路由。它维护着主题订阅列表,确保消息准确投递到目标智能体。
GrpcWorkerAgentRuntime
分布式运行时客户端,部署在每个智能体节点,处理与主机的连接和消息收发。它实现了消息的序列化/反序列化和网络传输。
Topic(主题)
基于发布/订阅模式的消息通道,智能体通过订阅特定主题接收相关消息。主题可以按功能(如"数据分析")或业务域(如"用户服务")进行划分。
Agent(智能体)
具备特定能力的AI实体,通过运行时客户端连接到分布式网络。智能体可以是任务执行器(如数据分析器)、协调器(如工作流管理器)或交互接口(如用户界面)。
1.3 技术选型对比
| 技术方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| AutoGen gRPC运行时 | 跨语言支持、低延迟、高可靠性 | 配置相对复杂 | 企业级多智能体系统 |
| 基于消息队列的实现 | 松耦合、高吞吐量 | 延迟较高、缺乏类型安全 | 非实时数据处理场景 |
| 基于REST的API调用 | 简单易用、开发成本低 | 同步通信、开销大 | 简单集成场景 |
| WebSocket长连接 | 实时双向通信 | 连接管理复杂 | 实时交互场景 |
结论:AutoGen的gRPC方案在跨语言支持和性能表现方面具有明显优势,特别适合构建需要多智能体紧密协作的复杂系统。
二、工作原理:分布式通信的实现机制
2.1 异步消息传递架构
AutoGen分布式框架采用完全异步的消息驱动架构,这是实现高并发、低耦合的关键。其工作流程如下:
sequenceDiagram
participant Host as 中心主机
participant Writer as 数据分析智能体
participant Editor as 报告生成智能体
participant Manager as 任务管理智能体
Manager->>Host: 发布任务消息到"task_topic"
Host->>Writer: 转发任务消息
Writer->>Host: 发布分析结果到"result_topic"
Host->>Editor: 转发分析结果
Editor->>Host: 发布报告到"report_topic"
Host->>Manager: 转发完成报告
异步通信的核心优势在于:
- 系统各组件可独立扩展,不受其他组件性能影响
- 消息处理失败可重试,提高系统容错能力
- 峰值负载时可通过消息队列缓冲,避免系统过载
2.2 主题订阅机制详解
主题订阅是实现智能体间通信的基础机制,其工作原理可类比为"无线电广播":发布者向特定频率(主题)发送消息,所有调谐到该频率的订阅者都能接收消息。
订阅流程实现:
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
from autogen_core.messaging import Message
async def main():
# 1. 创建运行时客户端并连接到主机
runtime = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await runtime.connect()
# 2. 定义消息处理回调函数
async def handle_analysis_result(message: Message):
"""处理数据分析结果的回调函数"""
print(f"收到分析结果: {message.content}")
# 处理消息逻辑...
# 可以发布新消息到其他主题
response_msg = Message(
content="分析结果已处理",
topic="system_log",
metadata={"status": "success", "source": "report_agent"}
)
await runtime.publish(response_msg)
# 3. 订阅特定主题
await runtime.subscribe(
topic="data_analysis_results", # 订阅的主题名称
callback=handle_analysis_result # 消息处理函数
)
# 4. 保持连接以接收消息
print("报告生成智能体已启动,等待分析结果...")
await asyncio.Future() # 无限期运行以保持连接
if __name__ == "__main__":
import asyncio
asyncio.run(main())
2.3 跨语言协作实现
AutoGen通过统一的消息协议和类型定义,实现了Python与.NET智能体的无缝协作。关键技术包括:
- Protocol Buffers消息定义:使用.proto文件定义跨语言的消息结构
- 语言特定运行时:为每种语言提供优化的gRPC客户端实现
- 元数据交换机制:支持消息头、类型信息等元数据的跨语言传递
C#端智能体示例:
using Microsoft.AutoGen.Core.Grpc;
using Microsoft.AutoGen.Core.Messaging;
class DataProcessingAgent
{
private readonly GrpcWorkerAgentRuntime _runtime;
public DataProcessingAgent(string hostAddress)
{
// 创建C#运行时客户端
_runtime = new GrpcWorkerAgentRuntime(hostAddress);
}
public async Task StartAsync()
{
// 连接到主机
await _runtime.ConnectAsync();
// 订阅数据处理任务主题
await _runtime.SubscribeAsync("data_processing_tasks", HandleProcessingTask);
Console.WriteLine("数据处理智能体已启动");
}
private async Task HandleProcessingTask(Message message)
{
Console.WriteLine($"收到处理任务: {message.Content}");
// 处理数据...
var result = ProcessData(message.Content);
// 发布处理结果
await _runtime.PublishAsync(new Message(
content: result,
topic: "processing_results",
metadata: new Dictionary<string, string> {
{ "agent", "data_processor" },
{ "timestamp", DateTime.UtcNow.ToString() }
}
));
}
private string ProcessData(string input)
{
// 数据处理逻辑...
return $"Processed: {input}";
}
}
三、实战应用:分布式数据分析系统
3.1 场景设计:实时销售数据分析平台
我们将构建一个分布式销售数据分析系统,包含以下智能体:
- 数据采集智能体:从多个数据源收集销售数据
- 数据分析智能体:对原始数据进行统计和趋势分析
- 报告生成智能体:将分析结果转换为可视化报告
- 任务调度智能体:协调各智能体工作流程
系统架构如下:
graph TD
subgraph "中心节点"
HOST[GrpcWorkerAgentRuntimeHost]
TASK_TOPIC[任务调度主题]
DATA_TOPIC[原始数据主题]
ANALYSIS_TOPIC[分析结果主题]
REPORT_TOPIC[报告主题]
end
subgraph "数据采集节点"
COLLECTOR[数据采集智能体]
COLLECTOR -->|发布| DATA_TOPIC
COLLECTOR -->|订阅| TASK_TOPIC
end
subgraph "数据分析节点"
ANALYZER[数据分析智能体]
ANALYZER -->|订阅| DATA_TOPIC
ANALYZER -->|发布| ANALYSIS_TOPIC
end
subgraph "报告生成节点"
REPORTER[报告生成智能体]
REPORTER -->|订阅| ANALYSIS_TOPIC
REPORTER -->|发布| REPORT_TOPIC
end
subgraph "任务调度节点"
SCHEDULER[任务调度智能体]
SCHEDULER -->|发布| TASK_TOPIC
SCHEDULER -->|订阅| REPORT_TOPIC
end
HOST --> TASK_TOPIC
HOST --> DATA_TOPIC
HOST --> ANALYSIS_TOPIC
HOST --> REPORT_TOPIC
3.2 核心实现:任务调度智能体
任务调度智能体作为系统协调者,负责任务分配和流程监控:
import asyncio
from datetime import datetime, timedelta
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
from autogen_core.messaging import Message
class SchedulerAgent:
def __init__(self, runtime):
self.runtime = runtime
self.task_queue = []
self.completed_tasks = 0
self.total_tasks = 0
async def start(self):
"""启动调度智能体"""
# 订阅报告主题以接收完成通知
await self.runtime.subscribe("report_topic", self.handle_report_completion)
# 启动任务调度循环
asyncio.create_task(self.scheduling_loop())
print("任务调度智能体已启动")
async def scheduling_loop(self):
"""定期调度数据分析任务"""
while True:
# 每天9:00和15:00调度任务
now = datetime.now()
if now.hour in [9, 15] and now.minute == 0:
await self.schedule_daily_analysis()
# 每分钟检查一次
await asyncio.sleep(60)
async def schedule_daily_analysis(self):
"""调度每日数据分析任务"""
task_id = f"analysis_task_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.total_tasks += 1
# 创建任务消息
task_message = Message(
content={
"task_id": task_id,
"type": "daily_analysis",
"parameters": {
"time_range": "24h",
"metrics": ["sales", "conversion_rate", "average_order_value"],
"regions": ["north", "south", "east", "west"]
},
"priority": "normal"
},
topic="task_topic",
metadata={"task_id": task_id, "scheduled_time": datetime.now().isoformat()}
)
# 发布任务到任务主题
await self.runtime.publish(task_message)
print(f"已调度任务: {task_id}")
# 记录任务到队列
self.task_queue.append({
"task_id": task_id,
"status": "scheduled",
"scheduled_time": datetime.now()
})
async def handle_report_completion(self, message: Message):
"""处理报告完成通知"""
task_id = message.metadata.get("task_id")
if not task_id:
print("收到无效报告消息: 缺少task_id")
return
# 更新任务状态
for task in self.task_queue:
if task["task_id"] == task_id:
task["status"] = "completed"
task["completed_time"] = datetime.now()
self.completed_tasks += 1
break
print(f"任务 {task_id} 已完成,总完成率: {self.completed_tasks}/{self.total_tasks}")
# 发布系统日志
log_message = Message(
content=f"任务 {task_id} 分析报告已生成",
topic="system_log",
metadata={
"level": "info",
"source": "scheduler_agent",
"task_id": task_id
}
)
await self.runtime.publish(log_message)
async def main():
# 连接到分布式运行时主机
runtime = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await runtime.connect()
# 创建并启动调度智能体
scheduler = SchedulerAgent(runtime)
await scheduler.start()
# 保持运行
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
3.3 部署与运行指南
环境准备
- 克隆项目代码库:
git clone https://gitcode.com/GitHub_Trending/au/autogen
cd autogen
- 安装必要依赖:
# 对于Python智能体
pip install -r python/requirements.txt
# 对于.NET智能体
cd dotnet
dotnet restore
启动步骤
- 启动中心主机服务:
# 在终端1中运行
cd python/samples/core_distributed-group-chat
python run_host.py
- 启动数据采集智能体:
# 在终端2中运行
cd python/samples/core_distributed-group-chat
python run_collector_agent.py
- 启动数据分析智能体:
# 在终端3中运行
cd python/samples/core_distributed-group-chat
python run_analyzer_agent.py
- 启动报告生成智能体:
# 在终端4中运行
cd python/samples/core_distributed-group-chat
python run_reporter_agent.py
- 启动任务调度智能体:
# 在终端5中运行
cd python/samples/core_distributed-group-chat
python run_scheduler_agent.py
一键启动脚本
创建start_analysis_system.sh:
#!/bin/bash
# 分布式数据分析系统启动脚本
# 确保工作目录正确
cd /path/to/autogen/python/samples/core_distributed-group-chat
echo "启动分布式数据分析系统..."
# 启动中心主机服务
echo "启动中心主机服务..."
python run_host.py &
HOST_PID=$!
# 等待主机启动
sleep 5
# 启动数据采集智能体
echo "启动数据采集智能体..."
python run_collector_agent.py &
COLLECTOR_PID=$!
# 启动数据分析智能体
echo "启动数据分析智能体..."
python run_analyzer_agent.py &
ANALYZER_PID=$!
# 启动报告生成智能体
echo "启动报告生成智能体..."
python run_reporter_agent.py &
REPORTER_PID=$!
# 启动任务调度智能体
echo "启动任务调度智能体..."
python run_scheduler_agent.py &
SCHEDULER_PID=$!
echo "所有服务已启动,按Ctrl+C停止所有服务"
# 等待中断信号
trap "kill $HOST_PID $COLLECTOR_PID $ANALYZER_PID $REPORTER_PID $SCHEDULER_PID" SIGINT
wait
赋予执行权限并运行:
chmod +x start_analysis_system.sh
./start_analysis_system.sh
四、扩展优化:提升系统性能与可靠性
4.1 连接池与资源管理
在高并发场景下,频繁创建和销毁gRPC连接会导致性能开销。通过连接池管理可以显著提升系统效率:
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimePool
import asyncio
class AnalysisService:
def __init__(self):
# 创建连接池
self.runtime_pool = GrpcWorkerAgentRuntimePool(
host_address="localhost:50051",
pool_size=10, # 连接池大小
max_idle_time=300, # 连接最大空闲时间(秒)
reconnect_interval=5 # 重连间隔(秒)
)
async def analyze_data_batch(self, data_batch):
"""批量分析数据"""
results = []
# 从池中获取运行时实例
async with self.runtime_pool.get_runtime() as runtime:
# 为每个数据项创建消息
for data in data_batch:
message = Message(
content=data,
topic="data_analysis_requests"
)
# 发送消息并等待响应
response = await runtime.publish_and_await_response(
message,
response_topic="data_analysis_responses",
timeout=60 # 超时时间(秒)
)
results.append(response.content)
return results
连接池优化策略:
- 根据系统负载动态调整池大小
- 设置合理的连接超时和空闲超时时间
- 实现连接健康检查机制
- 采用异步池管理减少阻塞
4.2 消息可靠性保障
分布式系统必须处理网络不稳定、节点故障等问题。AutoGen提供了多种机制保障消息可靠传递:
1. 消息重试机制
async def publish_with_retry(runtime, message, max_retries=3, backoff_factor=0.3):
"""带重试机制的消息发布"""
for attempt in range(max_retries):
try:
await runtime.publish(message)
return True
except Exception as e:
if attempt == max_retries - 1:
print(f"消息发布失败,已达最大重试次数: {str(e)}")
return False
# 指数退避重试
delay = backoff_factor * (2 ** attempt)
print(f"消息发布失败,将在{delay:.2f}秒后重试: {str(e)}")
await asyncio.sleep(delay)
2. 消息持久化
对于关键业务消息,可实现持久化存储:
import json
import aiofiles
class PersistentMessagePublisher:
def __init__(self, runtime, storage_path="message_store"):
self.runtime = runtime
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
async def publish_persistent(self, message):
"""发布消息并持久化"""
# 生成唯一消息ID
message_id = str(uuid.uuid4())
# 存储消息到文件
storage_file = os.path.join(self.storage_path, f"{message_id}.json")
async with aiofiles.open(storage_file, 'w') as f:
await f.write(json.dumps({
"id": message_id,
"content": message.content,
"topic": message.topic,
"metadata": message.metadata,
"timestamp": datetime.now().isoformat(),
"status": "pending"
}))
# 发布消息
try:
await self.runtime.publish(message)
# 更新消息状态为已发送
async with aiofiles.open(storage_file, 'r+') as f:
data = json.loads(await f.read())
data["status"] = "sent"
await f.seek(0)
await f.write(json.dumps(data))
return True
except Exception as e:
# 记录错误状态
async with aiofiles.open(storage_file, 'r+') as f:
data = json.loads(await f.read())
data["status"] = "failed"
data["error"] = str(e)
await f.seek(0)
await f.write(json.dumps(data))
return False
4.3 监控与可观测性
构建完善的监控体系对于分布式系统至关重要:
1. 性能指标收集
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
# 定义监控指标
MESSAGES_PUBLISHED = Counter('autogen_messages_published_total', 'Total published messages', ['topic'])
MESSAGES_RECEIVED = Counter('autogen_messages_received_total', 'Total received messages', ['topic'])
MESSAGE_LATENCY = Histogram('autogen_message_latency_seconds', 'Message processing latency', ['topic'])
ACTIVE_CONNECTIONS = Gauge('autogen_active_connections', 'Number of active connections')
class MonitoredRuntime:
def __init__(self, runtime):
self.runtime = runtime
self.start_time = time.time()
async def publish(self, message):
"""监控发布消息"""
MESSAGES_PUBLISHED.labels(topic=message.topic).inc()
start_time = time.time()
try:
result = await self.runtime.publish(message)
latency = time.time() - start_time
MESSAGE_LATENCY.labels(topic=message.topic).observe(latency)
return result
except Exception as e:
# 记录错误指标
return None
async def subscribe(self, topic, callback):
"""监控订阅"""
async def monitored_callback(message):
MESSAGES_RECEIVED.labels(topic=topic).inc()
start_time = time.time()
try:
result = await callback(message)
latency = time.time() - start_time
MESSAGE_LATENCY.labels(topic=topic).observe(latency)
return result
except Exception as e:
# 记录错误指标
return None
return await self.runtime.subscribe(topic, monitored_callback)
# 启动指标暴露服务
start_http_server(8000)
2. 分布式追踪
集成OpenTelemetry实现分布式追踪:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
# 配置追踪
def configure_tracing(service_name):
# 设置追踪提供者
trace.set_tracer_provider(TracerProvider())
# 配置Jaeger导出器
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
# 添加批处理处理器
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# instrumentation gRPC客户端
GrpcInstrumentorClient().instrument()
return trace.get_tracer(service_name)
# 在智能体中使用追踪
tracer = configure_tracing("data_analyzer_agent")
async def process_data(message):
with tracer.start_as_current_span("process_data") as span:
span.set_attribute("task_id", message.metadata.get("task_id", "unknown"))
span.set_attribute("data_size", len(message.content))
# 处理数据...
result = analyze_data(message.content)
span.add_event("data_analyzed", attributes={"result_size": len(result)})
return result
五、常见问题排查指南
5.1 连接问题:智能体无法连接到主机
症状:智能体启动后提示"无法连接到gRPC主机"
排查步骤:
- 检查主机服务状态:
# 查看主机进程是否运行
ps aux | grep run_host.py
# 检查端口是否监听
netstat -tuln | grep 50051
- 网络连通性测试:
# 测试主机端口可达性
telnet localhost 50051
# 使用gRPCurl测试服务
grpcurl -plaintext localhost:50051 list
- 防火墙设置检查:
# 检查防火墙规则
sudo ufw status
# 如需要,开放端口
sudo ufw allow 50051/tcp
解决方案:
- 确保主机服务先于智能体启动
- 验证主机地址和端口配置正确
- 检查网络策略和防火墙设置
- 对于跨主机部署,确保路由可达
5.2 消息传递问题:消息丢失或延迟
症状:智能体发送的消息未被接收,或接收严重延迟
排查步骤:
- 检查主题订阅情况:
# 在主机上添加主题订阅监控
async def monitor_subscriptions(service):
while True:
subscriptions = await service.get_subscriptions()
print("当前订阅情况:")
for topic, agents in subscriptions.items():
print(f" 主题 {topic}: {len(agents)} 个订阅者")
await asyncio.sleep(10)
- 启用gRPC调试日志:
# 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
- 检查消息处理性能:
# 添加消息处理计时
async def timed_callback(message):
start_time = time.time()
result = await actual_callback(message)
latency = time.time() - start_time
print(f"消息处理延迟: {latency:.2f}秒")
return result
解决方案:
- 确保消息处理回调函数不阻塞事件循环
- 对于耗时处理,使用任务池异步执行
- 实现消息确认机制确保可靠传递
- 考虑使用消息优先级机制处理关键消息
5.3 跨语言协作问题:Python与.NET智能体通信异常
症状:Python智能体发送的消息,.NET智能体无法正确解析,反之亦然
排查步骤:
-
验证消息协议定义:
- 确保双方使用相同版本的.proto文件
- 检查字段类型和默认值是否一致
-
检查消息序列化/反序列化:
# Python端测试消息序列化
from autogen_core.messaging import Message
import json
message = Message(content={"key": "value"}, topic="test")
serialized = message.to_json()
print("序列化消息:", serialized)
# 尝试反序列化
deserialized = Message.from_json(serialized)
print("反序列化结果:", deserialized.content)
- 检查元数据处理:
- 确保元数据值都是字符串类型
- 避免使用特殊字符或过长的值
解决方案:
- 使用最新版本的AutoGen库确保兼容性
- 严格遵循Protocol Buffers类型系统
- 实现跨语言兼容的自定义类型转换器
- 在消息中添加版本字段便于兼容性处理
六、技术发展趋势与应用扩展
6.1 下一代分布式智能体系统的演进方向
AutoGen分布式框架正朝着以下方向发展:
1. 自适应负载均衡
未来版本将引入基于AI的动态负载均衡机制,能够根据节点性能、网络状况和任务特性,自动优化消息路由和任务分配。这将大幅提升系统在异构环境下的整体性能。
2. 智能合约驱动的协作
引入区块链和智能合约技术,实现智能体间协作规则的自动执行和信任建立。这将使AutoGen能够支持需要高度信任和透明性的商业场景。
3. 边缘计算集成
优化轻量级运行时,支持在边缘设备上部署智能体节点。这将拓展AutoGen在物联网、工业控制等领域的应用,实现"云-边-端"一体化的智能体网络。
4. 强化学习优化的任务调度
利用强化学习算法优化任务调度策略,使系统能够根据历史性能数据和实时状况,动态调整资源分配和任务优先级。
6.2 应用领域扩展建议
AutoGen分布式智能体框架可在多个领域拓展应用:
1. 智能物联网系统
部署在边缘设备和云端的智能体协同工作,实现实时数据处理、异常检测和自主决策。例如,智能家居中的环境监控、能源管理和安全系统。
2. 金融风控平台
多个专业智能体协作进行市场分析、风险评估和欺诈检测。不同智能体专注于不同方面,如交易监控、信用评估和合规检查。
3. 医疗诊断辅助系统
放射科智能体、病理分析智能体和临床决策智能体协同工作,提供更全面的诊断建议。分布式架构确保患者数据隐私和系统可靠性。
4. 智能制造协同
生产线上的质量检测智能体、设备维护智能体和生产规划智能体实时协作,优化生产流程并预测潜在问题。
6.3 实施建议与最佳实践
1. 系统设计阶段
- 基于业务领域合理划分主题和智能体职责
- 设计松耦合的智能体接口,便于独立升级
- 制定明确的消息协议和错误处理机制
2. 开发实现阶段
- 采用测试驱动开发,为每个智能体编写单元测试
- 实现全面的监控和日志记录
- 建立完善的版本控制和部署流程
3. 运维管理阶段
- 实施自动化部署和回滚机制
- 建立性能基准和定期评估
- 制定扩容策略应对负载增长
4. 持续优化阶段
- 收集和分析系统运行数据
- 定期审查智能体协作效率
- 关注AutoGen新版本特性并适时升级
结语
AutoGen分布式运行时框架为构建大规模、跨语言的智能体协作系统提供了强大的基础设施。通过gRPC协议实现的异步消息传递和主题订阅机制,开发者可以轻松构建弹性扩展、高可靠性的分布式AI应用。随着技术的不断演进,AutoGen将在更多领域展现其价值,推动人工智能从单模型应用向多智能体协同系统发展。
掌握分布式智能体协作技术,将成为未来AI系统架构师的核心竞争力。
GLM-5智谱 AI 正式发布 GLM-5,旨在应对复杂系统工程和长时域智能体任务。Jinja00
GLM-5.1GLM-5.1是智谱迄今最智能的旗舰模型,也是目前全球最强的开源模型。GLM-5.1大大提高了代码能力,在完成长程任务方面提升尤为显著。和此前分钟级交互的模型不同,它能够在一次任务中独立、持续工作超过8小时,期间自主规划、执行、自我进化,最终交付完整的工程级成果。Jinja00
LongCat-AudioDiT-1BLongCat-AudioDiT 是一款基于扩散模型的文本转语音(TTS)模型,代表了当前该领域的最高水平(SOTA),它直接在波形潜空间中进行操作。00- QQwen3.5-397B-A17BQwen3.5 实现了重大飞跃,整合了多模态学习、架构效率、强化学习规模以及全球可访问性等方面的突破性进展,旨在为开发者和企业赋予前所未有的能力与效率。Jinja00
HY-Embodied-0.5这是一套专为现实世界具身智能打造的基础模型。该系列模型采用创新的混合Transformer(Mixture-of-Transformers, MoT) 架构,通过潜在令牌实现模态特异性计算,显著提升了细粒度感知能力。Jinja00
FreeSql功能强大的对象关系映射(O/RM)组件,支持 .NET Core 2.1+、.NET Framework 4.0+、Xamarin 以及 AOT。C#00