#!/usr/bin/env python3 """ Agent 任务轮询脚本 每个 Agent 定时运行,查询分配给自己的任务并执行 """ import os import sys import time import logging import requests from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 环境变量或配置 NOTION_TOKEN = os.environ.get("NOTION_TOKEN", "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") AGENT_ID = os.environ.get("AGENT_ID", "yunjiang") # 当前 Agent ID POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "180")) # 轮询间隔(秒),默认3分钟 # Database IDs TASKS_DB_ID = "32847fe1-da27-8135-af44-eefdbd3b1640" AGENTS_DB_ID = "32847fe1-da27-8101-8758-d416db87d4de" # Notion API 基础 URL NOTION_API_BASE = "https://api.notion.com/v1" def notion_request(method, endpoint, **kwargs): """Notion API 请求封装""" url = f"{NOTION_API_BASE}{endpoint}" headers = { "Authorization": f"Bearer {NOTION_TOKEN}", "Notion-Version": "2022-06-28", "Content-Type": "application/json" } if method == "GET": response = requests.get(url, headers=headers, **kwargs) elif method == "POST": response = requests.post(url, headers=headers, **kwargs) elif method == "PATCH": response = requests.patch(url, headers=headers, **kwargs) else: raise ValueError(f"Unsupported method: {method}") response.raise_for_status() return response.json() def get_agent_info(agent_id): """获取 Agent 信息""" response = notion_request("POST", f"/databases/{AGENTS_DB_ID}/query") for page in response.get("results", []): props = page.get("properties", {}) if "Agent ID" in props: title = props["Agent ID"]["title"] if title and title[0]["plain_text"] == agent_id: return { "id": page["id"], "name": props["名称"]["rich_text"][0]["plain_text"] if props["名称"]["rich_text"] else agent_id, "status": props["状态"]["select"]["name"] if props["状态"].get("select") else "离线" } return None def query_todo_tasks(agent_page_id=None): """查询 TODO 任务""" logger.info(f"查询 {AGENT_ID} 的 TODO 任务...") # 如果有 agent_page_id,使用 Relation 过滤 filter_dict = { "property": "状态", "select": { "equals": "TODO" } } # 这里简化处理:查询所有 TODO 任务 # 后续应该根据执行者 Relation 过滤 try: response = notion_request( "POST", f"/databases/{TASKS_DB_ID}/query", json={"filter": filter_dict} ) tasks = response.get("results", []) logger.info(f"找到 {len(tasks)} 个 TODO 任务") return tasks except Exception as e: logger.error(f"查询任务失败: {e}") return [] def claim_task(task_id): """领取任务:将状态改为进行中""" try: notion_request( "PATCH", f"/pages/{task_id}", json={ "properties": { "状态": { "select": {"name": "进行中"} } } } ) logger.info(f"✓ 领取任务成功: {task_id}") return True except Exception as e: logger.error(f"领取任务失败: {e}") return False def complete_task(task_id, report_link): """完成任务:将状态改为待验收""" try: notion_request( "PATCH", f"/pages/{task_id}", json={ "properties": { "状态": { "select": {"name": "待验收"} }, "报告链接": { "url": report_link } } } ) logger.info(f"✓ 完成任务: {task_id}") return True except Exception as e: logger.error(f"完成任务失败: {e}") return False def execute_task(task): """执行任务的逻辑(可自定义)""" # 获取任务信息 props = task.get("properties", {}) task_name = props.get("任务名", {}).get("title", [{}])[0].get("plain_text", "未命名任务") task_id = task["id"] logger.info(f"开始执行任务: {task_name}") # 1. 领取任务 if not claim_task(task_id): return False # 2. 执行任务(这里只是示例,实际应根据任务类型执行不同操作) # 模拟执行 time.sleep(2) # 3. 完成任务(生成报告链接) # 这里应该生成实际的 Obsidian 报告 report_link = f"https://example.com/report/{task_id}" return complete_task(task_id, report_link) def polling_loop(): """轮询主循环""" logger.info(f"🚀 Agent {AGENT_ID} 任务轮询启动") logger.info(f"轮询间隔: {POLL_INTERVAL} 秒") # 获取 Agent 信息 agent_info = get_agent_info(AGENT_ID) if agent_info: logger.info(f"Agent 信息: {agent_info['name']} (状态: {agent_info['status']})") else: logger.warning(f"未找到 Agent: {AGENT_ID}") while True: try: # 查询 TODO 任务 tasks = query_todo_tasks(None) if tasks: logger.info(f"发现 {len(tasks)} 个待处理任务") for task in tasks: execute_task(task) else: logger.debug("没有待处理任务") except Exception as e: logger.error(f"轮询异常: {e}") time.sleep(POLL_INTERVAL) def main(): """主入口""" if len(sys.argv) > 1: global AGENT_ID AGENT_ID = sys.argv[1] if len(sys.argv) > 2: global POLL_INTERVAL POLL_INTERVAL = int(sys.argv[2]) polling_loop() if __name__ == "__main__": main()