# 星枢 Agent 任务解耦技术方案 > 基于 RabbitMQ 的分布式任务队列架构 --- ## 一、概述 ### 背景 当前星枢(主 Agent)与其他 Agent 的通信方式: | 方式 | 命令 | 局限 | |------|------|------| | 本地 | `openclaw agent --agent xingyao --message "..." --deliver` | 同步等待 | | 远程 | `ssh ubuntu2 "openclaw agent --agent yunce --message ..."` | 串行阻塞 | ### 目标 - **异步执行**:任务下发不等待结果 - **任务持久化**:重启不丢失 - **可监控**:实时查看任务状态 - **可扩展**:支持多 Agent 并行 --- ## 二、技术选型 ### RabbitMQ vs 其他 | 特性 | RabbitMQ | Redis Streams | Kafka | |------|----------|---------------|-------| | 消息确认 | ✅ ACK | ✅ ACK | ✅ ACK | | 优先级队列 | ✅ | ❌ | ❌ | | 延迟队列 | ✅ (插件) | ✅ | ❌ | | 持久化 | ✅ | ✅ | ✅ | | 集群 | ✅ | 有限 | ✅ | | 生态成熟度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | | 轻量级 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | **推荐:RabbitMQ** 理由: - 消息确认机制完善 - 支持复杂路由规则 - 管理界面友好 - 适合中低并发场景 --- ## 三、架构设计 ### 3.1 整体架构 ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ 用户 │ │ (Telegram/Discord) │ └─────────────────────────────────┬───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 星枢 (主 Agent) │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 意图理解 │ │ 任务分解 │ │ 队列管理 │ │ 结果聚合 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────┬───────────────────────────────────────┘ │ ┌─────────────┴─────────────┐ │ RabbitMQ 集群 │ │ (task_exchange) │ └─────────────┬─────────────┘ │ ┌───────────────────────┼───────────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Yunce (Agent) │ │ Atlas (Agent) │ │ Prometheus │ │ 队列: tasks │ │ 队列: tasks │ │ 队列: tasks │ │ 状态: running │ │ 状态: idle │ │ 状态: idle │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ │ ┌──────────────────┴──────────────────┐ │ │ │ 结果收集 (result_exchange) │ │ │ └──────────────────┬──────────────────┘ │ │ │ │ └──────────────────────┼──────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 星枢 (结果处理) │ │ - 任务状态更新 │ │ - 用户反馈 │ │ - 后续任务触发 │ └─────────────────────────────────────────────────────────────────────────┘ ``` ### 3.2 消息流设计 ``` ┌──────────────────────────────────────────────────────────────────────────┐ │ 消息生命周期 │ └──────────────────────────────────────────────────────────────────────────┘ [1] 任务下发 [5] 结果处理 │ ▲ ▼ │ ┌────────┐ ┌────────────┐ ┌───────────┐ ┌───────────┐ │ │ 星枢 │───▶│ RabbitMQ │───▶│ Agent N │───▶│ RabbitMQ │──────┘ │创建任务 │ │ (持久化) │ │ 执行任务 │ │ (结果队列) │ └────────┘ └────────────┘ └───────────┘ └───────────┘ │ │ │ [4] ACK 确认 │ │ [2] 任务入队 │ (可选: 延迟队列) ▼ │ ┌───────────┐ └─────────────▶│ 状态变更 │ │ (处理中→完成) └───────────┘ [3] Agent 消费任务 ``` ### 3.3 Exchange & Queue 设计 ``` ┌─────────────────┐ │ task_exchange │ (Topic Exchange) │ (星枢下发) │ └────────┬────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ queue.yunce │ │ queue.atlas │ │ queue.prometheus│ │ routing: │ │ routing: │ │ routing: │ │ task.yunce │ │ task.atlas │ │ task.prometheus │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ ▼ ▼ ▼ [Agent: Yunce] [Agent: Atlas] [Agent: Prometheus] ───────────────────────────────────────────────────────────────────────── ┌─────────────────┐ │result_exchange │ (Topic Exchange) │ (结果收集) │ └────────┬────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │result.yunce │ │result.atlas │ │result.prometheus │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ └────────────────────┼────────────────────┘ │ ▼ ┌─────────────────┐ │ queue.star聚合 │ ← 星枢监听此队列 │ routing: result.#│ └─────────────────┘ ``` --- ## 四、消息格式定义 ### 4.1 任务消息 (Task Message) ```json { "taskId": "task_20260317_001", "type": "task", "source": "xingyao", "target": "yunce", "priority": "high", "content": { "action": "code_review", "params": { "repo": "my-project", "branch": "feature/login" } }, "metadata": { "createdAt": "2026-03-17T10:30:00Z", "expireAt": "2026-03-17T11:30:00Z", "retryCount": 0, "maxRetries": 3 } } ``` ### 4.2 结果消息 (Result Message) ```json { "taskId": "task_20260317_001", "type": "result", "source": "yunce", "target": "xingyao", "status": "success", "content": { "summary": "代码审查完成", "findings": [ {"severity": "warning", "message": "建议添加参数校验"} ], "output": "/path/to/report.md" }, "metadata": { "completedAt": "2026-03-17T10:35:00Z", "duration": 300 } } ``` ### 4.3 心跳消息 (Heartbeat Message) ```json { "type": "heartbeat", "agent": "yunce", "status": "idle", "currentTask": null, "timestamp": "2026-03-17T10:30:00Z" } ``` --- ## 五、实现步骤 ### 5.1 RabbitMQ 部署 ```bash # Docker 部署 docker run -d \ --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=your_password \ rabbitmq:3.12-management # 访问管理界面 # http://your-server:15672 ``` ### 5.2 创建 Exchange 和 Queue (初始化脚本) ```python # setup_rabbitmq.py import pika def setup_rabbitmq(): connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', port=5672) ) channel = connection.channel() # 1. 创建 Exchange channel.exchange_declare(exchange='task_exchange', exchange_type='topic', durable=True) channel.exchange_declare(exchange='result_exchange', exchange_type='topic', durable=True) # 2. 创建任务队列 (按 Agent) agents = ['yunce', 'atlas', 'prometheus', 'oracle'] for agent in agents: channel.queue_declare(queue=f'queue.{agent}', durable=True) channel.queue_bind( exchange='task_exchange', queue=f'queue.{agent}', routing_key=f'task.{agent}' ) # 3. 创建星枢结果聚合队列 channel.queue_declare(queue='queue.star', durable=True) channel.queue_bind( exchange='result_exchange', queue='queue.star', routing_key='result.#' ) connection.close() print("✅ RabbitMQ 初始化完成") if __name__ == '__main__': setup_rabbitmq() ``` ### 5.3 星枢任务下发模块 ```python # star_sender.py import pika import json import uuid from datetime import datetime class StarTaskSender: def __init__(self, rabbitmq_host='localhost'): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=rabbitmq_host) ) self.channel = self.connection.channel() def send_task(self, target_agent, action, params, priority='normal'): task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" message = { "taskId": task_id, "type": "task", "source": "xingyao", "target": target_agent, "priority": priority, "content": { "action": action, "params": params }, "metadata": { "createdAt": datetime.now().isoformat() + "Z", "retryCount": 0, "maxRetries": 3 } } self.channel.basic_publish( exchange='task_exchange', routing_key=f'task.{target_agent}', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, # 持久化 priority=10 if priority == 'high' else 5 ) ) print(f"✅ 任务已下发: {task_id} -> {target_agent}") return task_id def close(self): self.connection.close() # 使用示例 if __name__ == '__main__': sender = StarTaskSender() # 下发任务给 Yunce task_id = sender.send_task( target_agent='yunce', action='code_review', params={'repo': 'my-project', 'branch': 'main'}, priority='high' ) sender.close() ``` ### 5.4 Agent 任务监听模块 ```python # agent_listener.py import pika import json import subprocess import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AgentListener: def __init__(self, agent_name, rabbitmq_host='localhost'): self.agent_name = agent_name self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=rabbitmq_host) ) self.channel = self.connection.channel() def execute_task(self, task_content): """执行任务的核心逻辑""" action = task_content['action'] params = task_content['params'] logger.info(f"执行任务: {action}") # 根据 action 调用不同的处理函数 handlers = { 'code_review': self.handle_code_review, 'data_analysis': self.handle_data_analysis, 'file_operation': self.handle_file_operation, } handler = handlers.get(action, self.handle_default) return handler(params) def handle_code_review(self, params): # 调用 OpenClaw agent result = subprocess.run( ['openclaw', 'agent', '--agent', 'yunce', '--message', f"请审查代码仓库 {params.get('repo')}"], capture_output=True, text=True ) return {'output': result.stdout, 'status': 'success'} def handle_default(self, params): return {'message': f'Unknown action: {params}'} def on_message(self, ch, method, properties, body): """消息处理回调""" try: message = json.loads(body) task_id = message['taskId'] logger.info(f"收到任务: {task_id}") # 执行任务 result = self.execute_task(message['content']) # 发送结果 self.send_result(task_id, result) # ACK 确认 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"任务执行失败: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) def send_result(self, task_id, result): """发送结果到星枢""" result_message = { "taskId": task_id, "type": "result", "source": self.agent_name, "target": "xingyao", "status": "success", "content": result, "metadata": { "completedAt": datetime.now().isoformat() + "Z" } } self.channel.basic_publish( exchange='result_exchange', routing_key=f'result.{self.agent_name}', body=json.dumps(result_message), properties=pika.BasicProperties(delivery_mode=2) ) def start_listening(self): """开始监听任务队列""" self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( queue=f'queue.{self.agent_name}', on_message_callback=self.on_message ) logger.info(f"🤖 Agent [{self.agent_name}] 开始监听任务队列...") self.channel.start_consuming() # 使用示例 if __name__ == '__main__': import sys agent_name = sys.argv[1] if len(sys.argv) > 1 else 'yunce' listener = AgentListener(agent_name) listener.start_listening() ``` ### 5.5 星枢结果收集模块 ```python # star_receiver.py import pika import json from datetime import datetime class StarResultReceiver: def __init__(self, rabbitmq_host='localhost'): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=rabbitmq_host) ) self.channel = self.connection.channel() self.pending_tasks = {} # 跟踪待处理任务 def on_message(self, ch, method, properties, body): message = json.loads(body) if message['type'] == 'result': task_id = message['taskId'] status = message['status'] result = message['content'] print(f"📋 任务完成: {task_id}") print(f" 状态: {status}") print(f" 结果: {result}") # 更新任务状态 if task_id in self.pending_tasks: self.pending_tasks[task_id]['status'] = 'completed' self.pending_tasks[task_id]['result'] = result # 可以触发后续任务 self.handle_next_action(message) elif message['type'] == 'heartbeat': print(f"💓 Agent 心跳: {message['agent']} - {message['status']}") ch.basic_ack(delivery_tag=method.delivery_tag) def handle_next_action(self, message): """根据结果触发后续动作""" # 示例:根据结果发送新任务 pass def start_listening(self): self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( queue='queue.star', on_message_callback=self.on_message ) print("🌟 星枢开始监听任务结果...") self.channel.start_consuming() # 使用示例 if __name__ == '__main__': receiver = StarResultReceiver() receiver.start_listening() ``` --- ## 六、监控界面 ### 6.1 RabbitMQ 管理界面 ``` URL: http://localhost:15672 用户名: admin 密码: your_password 可查看: - 队列状态 (Messages, Ready, Unacked) - 连接数 - 消息流速 - 交换机绑定 ``` ### 6.2 自定义监控面板 (可选) ```python # 简单的任务状态查询 def get_task_status(task_id): # 可以通过 REST API 查询 # 或者维护一个 Redis 状态缓存 pass def list_pending_tasks(): # 列出所有待处理任务 pass def list_agent_status(): # 列出所有 Agent 状态 pass ``` --- ## 七、完整工作流程示例 ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ 完整示例:代码审查任务 │ └─────────────────────────────────────────────────────────────────────────┘ [用户] │ │ "星枢,帮我审查 my-project 的 main 分支" ▼ [星枢 - 意图理解] │ action: code_review │ target: yunce │ params: {repo: "my-project", branch: "main"} ▼ [星枢 - 任务下发] │ RabbitMQ: task.yunce │ taskId: task_20260317_001 ▼ [RabbitMQ] (持久化消息) ▼ [Yunce Agent - 任务监听] │ 收到任务 -> 执行 code_review │ 调用: openclaw agent --agent yunce --message "审查 my-project" ▼ [Yunce Agent - 返回结果] │ RabbitMQ: result.yunce │ status: success, findings: [...] ▼ [RabbitMQ] │ result.# -> queue.star ▼ [星枢 - 结果收集] │ 接收结果 -> 更新状态 │ 格式化输出 -> 推送给用户 ▼ [用户] │ 收到审查报告 ``` --- ## 八、部署建议 ### 8.1 生产环境配置 ```yaml # docker-compose.yml version: '3.8' services: rabbitmq: image: rabbitmq:3.12-management ports: - "5672:5672" - "15672:15672" environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD} volumes: - rabbitmq_data:/var/lib/rabbitmq healthcheck: test: ["CMD", "rabbitmq-diagnostics", "check_running"] interval: 30s volumes: rabbitmq_data: ``` ### 8.2 安全建议 1. **认证**:启用 RabbitMQ 用户认证 2. **SSL/TLS**:生产环境启用 amqps 3. **VHost**:不同项目使用不同 vhost 4. **权限**:最小权限原则 --- ## 九、故障处理 | 故障场景 | 解决方案 | |----------|----------| | Agent 宕机 | 任务自动重新入队 (requeue) | | RabbitMQ 宕机 | 消息持久化,重启后恢复 | | 任务超时 | 设置 TTL,自动移到死信队列 | | 消息积压 | 监控队列长度,扩展消费者 | --- ## 十、进阶功能 ### 10.1 延迟任务 ```python # 延迟队列:让任务在指定时间后执行 def send_delayed_task(target, action, delay_seconds): # 使用 RabbitMQ 延迟插件 或 配合 Redis 实现 pass ``` ### 10.2 优先级队列 ```python # 高优先级任务优先处理 channel.queue_declare(queue='queue.yunce', arguments={ 'x-max-priority': 10 }) ``` ### 10.3 任务超时 ```python # 消息 TTL + 死信队列 channel.queue_declare( queue='queue.yunce', arguments={ 'x-message-ttl': 3600000, # 1小时 'x-dead-letter-exchange': 'dlx_exchange' } ) ``` --- ## 附录:文件清单 | 文件 | 说明 | |------|------| | `setup_rabbitmq.py` | RabbitMQ 初始化脚本 | | `star_sender.py` | 星枢任务下发模块 | | `agent_listener.py` | Agent 任务监听模块 | | `star_receiver.py` | 星枢结果收集模块 | | `docker-compose.yml` | 一键部署配置 | --- *文档版本: 1.0* *创建时间: 2026-03-17* *作者: 云策*