From bc471256f43a8c1e7ffbc8131337b238c40d97ca Mon Sep 17 00:00:00 2001 From: iomgaa Date: Mon, 11 Aug 2025 00:01:49 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9A=E4=BC=98=E5=8C=96Ba?= =?UTF-8?q?seAgent=E6=A0=B8=E5=BF=83=E9=80=BB=E8=BE=91=E5=92=8CJSON?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要改进: - 修复JSON解析逻辑:支持嵌套结构和更强的容错性 - 优化agno集成:移除自动response_model以获取原始JSON - 增强调试模式:默认启用调试输出便于问题排查 - 完善错误处理:改进异步任务管理和异常处理 - 添加详细日志:提供更好的解析过程可观察性 技术细节: - 实现_extract_complete_json方法处理复杂嵌套JSON - 重构_parse_json_response支持多层级数据结构 - 优化异步任务处理避免未使用变量警告 - 统一调试输出格式提升开发体验 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- agent_system/base/agent.py | 100 ++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/agent_system/base/agent.py b/agent_system/base/agent.py index d7393de..da93f38 100644 --- a/agent_system/base/agent.py +++ b/agent_system/base/agent.py @@ -43,7 +43,7 @@ class BaseAgent: storage: Optional[SqliteAgentStorage] = None, use_cache: bool = False, markdown: bool = True, - debug_mode: bool = False, + debug_mode: bool = True, num_requests: int = 1, llm_config: Dict[str, Any] = None, **kwargs @@ -141,13 +141,12 @@ class BaseAgent: # 初始化模型 model = self._create_model_instance(model_class, model_kwargs) - # 创建代理 + # 创建代理 - 不传入response_model以获取原始JSON字符串 self.agent = Agent( model=model, description=description, instructions=instructions, markdown=markdown, - response_model=response_model, debug_mode=debug_mode, storage=storage, **kwargs @@ -272,8 +271,9 @@ class BaseAgent: 第一个有效的结构化响应,如果全部失败则返回 None """ with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_requests) as executor: + # 不传入output_class,让agno返回原始字符串 futures = [ - executor.submit(self.agent.run, prompt, output_class=self.response_model, **kwargs) + executor.submit(self.agent.run, prompt, **kwargs) for _ in range(self.num_requests) ] @@ -303,13 +303,12 @@ class BaseAgent: response: RunResponse = future.result() potential_result = response.content - # 直接模型实例 - if isinstance(potential_result, self.response_model): - return potential_result - - # 需要解析的字符串响应 + # 强制进行手动JSON解析(绕过agno自动解析) if isinstance(potential_result, str): return self._parse_json_response(potential_result) + elif isinstance(potential_result, self.response_model): + # 如果agno已经解析过,直接返回 + return potential_result except Exception as e: print(f"代理运行失败: {e}") @@ -325,30 +324,97 @@ class BaseAgent: Returns: 解析后的模型实例,如果解析失败则返回 None """ + if not response_str or not response_str.strip(): + print(f"空响应字符串,无法解析JSON") + return None + # 清理响应字符串 cleaned_str = response_str.strip() + print(f"调试: 原始响应 = {repr(response_str[:200])}...") - # 移除代码块标记 + # 移除可能的代码块标记 if cleaned_str.startswith('```json'): cleaned_str = cleaned_str[7:] if cleaned_str.endswith('```'): cleaned_str = cleaned_str[:-3] + + # 尝试提取JSON内容 - 支持嵌套JSON结构 + import re + + # 首先尝试查找完整的JSON对象(考虑嵌套结构) + json_str = self._extract_complete_json(cleaned_str) + if json_str: + print(f"调试: 提取的完整JSON = {repr(json_str[:200])}...") + else: + json_str = cleaned_str.strip() + print(f"调试: 未找到JSON结构,使用原始内容 = {repr(json_str[:200])}...") try: - data_dict = json.loads(cleaned_str.strip()) + data_dict = json.loads(json_str) + print(f"调试: 解析成功的字典 = {data_dict}") if self.response_model: try: - return self.response_model(**data_dict) + result = self.response_model(**data_dict) + print(f"调试: 成功创建模型实例 = {result}") + return result except Exception as e: print(f"无法从字典创建模型实例: {e}") - return data_dict + return None else: return data_dict - except json.JSONDecodeError: + except json.JSONDecodeError as e: + print(f"JSON解析失败: {e}") + print(f"尝试解析的内容: {repr(json_str)}") return None + def _extract_complete_json(self, text: str) -> str: + """ + 从文本中提取完整的JSON对象,支持嵌套结构 + + Args: + text: 包含JSON的文本 + + Returns: + 提取的完整JSON字符串,如果未找到则返回None + """ + # 查找第一个左大括号 + start_idx = text.find('{') + if start_idx == -1: + return None + + # 使用计数器匹配完整的JSON对象 + brace_count = 0 + in_string = False + escape_next = False + + for i, char in enumerate(text[start_idx:], start_idx): + if escape_next: + escape_next = False + continue + + if char == '\\' and in_string: + escape_next = True + continue + + if char == '"' and not escape_next: + in_string = not in_string + continue + + if not in_string: + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + + # 找到匹配的右大括号 + if brace_count == 0: + return text[start_idx:i+1] + + # 如果没有找到匹配的大括号,返回从第一个{到末尾 + return text[start_idx:] if brace_count > 0 else None + def _run_unstructured(self, prompt: str, **kwargs) -> str: """执行非结构化输出运行。 @@ -449,7 +515,7 @@ class BaseAgent: RuntimeError: 如果无法获得有效的结构化响应 """ tasks = { - asyncio.create_task(self.agent.arun(prompt, output_class=self.response_model, **kwargs)) + asyncio.create_task(self.agent.arun(prompt, **kwargs)) for _ in range(self.num_requests) } @@ -477,7 +543,7 @@ class BaseAgent: break # 等待下一个任务完成 - done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) raise RuntimeError(f"在 {self.num_requests} 次并行尝试后无法获得有效的结构化响应") @@ -504,7 +570,7 @@ class BaseAgent: try: # 等待第一个完成的任务 - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) first_task = done.pop() try: