重构:优化BaseAgent核心逻辑和JSON解析机制

主要改进:
- 修复JSON解析逻辑:支持嵌套结构和更强的容错性
- 优化agno集成:移除自动response_model以获取原始JSON
- 增强调试模式:默认启用调试输出便于问题排查
- 完善错误处理:改进异步任务管理和异常处理
- 添加详细日志:提供更好的解析过程可观察性

技术细节:
- 实现_extract_complete_json方法处理复杂嵌套JSON
- 重构_parse_json_response支持多层级数据结构
- 优化异步任务处理避免未使用变量警告
- 统一调试输出格式提升开发体验

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
iomgaa 2025-08-11 00:01:49 +08:00
parent 5233399151
commit bc471256f4

View File

@ -43,7 +43,7 @@ class BaseAgent:
storage: Optional[SqliteAgentStorage] = None, storage: Optional[SqliteAgentStorage] = None,
use_cache: bool = False, use_cache: bool = False,
markdown: bool = True, markdown: bool = True,
debug_mode: bool = False, debug_mode: bool = True,
num_requests: int = 1, num_requests: int = 1,
llm_config: Dict[str, Any] = None, llm_config: Dict[str, Any] = None,
**kwargs **kwargs
@ -141,13 +141,12 @@ class BaseAgent:
# 初始化模型 # 初始化模型
model = self._create_model_instance(model_class, model_kwargs) model = self._create_model_instance(model_class, model_kwargs)
# 创建代理 # 创建代理 - 不传入response_model以获取原始JSON字符串
self.agent = Agent( self.agent = Agent(
model=model, model=model,
description=description, description=description,
instructions=instructions, instructions=instructions,
markdown=markdown, markdown=markdown,
response_model=response_model,
debug_mode=debug_mode, debug_mode=debug_mode,
storage=storage, storage=storage,
**kwargs **kwargs
@ -272,8 +271,9 @@ class BaseAgent:
第一个有效的结构化响应如果全部失败则返回 None 第一个有效的结构化响应如果全部失败则返回 None
""" """
with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_requests) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_requests) as executor:
# 不传入output_class让agno返回原始字符串
futures = [ futures = [
executor.submit(self.agent.run, prompt, output_class=self.response_model, **kwargs) executor.submit(self.agent.run, prompt, **kwargs)
for _ in range(self.num_requests) for _ in range(self.num_requests)
] ]
@ -303,13 +303,12 @@ class BaseAgent:
response: RunResponse = future.result() response: RunResponse = future.result()
potential_result = response.content potential_result = response.content
# 直接模型实例 # 强制进行手动JSON解析绕过agno自动解析
if isinstance(potential_result, self.response_model):
return potential_result
# 需要解析的字符串响应
if isinstance(potential_result, str): if isinstance(potential_result, str):
return self._parse_json_response(potential_result) return self._parse_json_response(potential_result)
elif isinstance(potential_result, self.response_model):
# 如果agno已经解析过直接返回
return potential_result
except Exception as e: except Exception as e:
print(f"代理运行失败: {e}") print(f"代理运行失败: {e}")
@ -325,30 +324,97 @@ class BaseAgent:
Returns: Returns:
解析后的模型实例如果解析失败则返回 None 解析后的模型实例如果解析失败则返回 None
""" """
if not response_str or not response_str.strip():
print(f"空响应字符串无法解析JSON")
return None
# 清理响应字符串 # 清理响应字符串
cleaned_str = response_str.strip() cleaned_str = response_str.strip()
print(f"调试: 原始响应 = {repr(response_str[:200])}...")
# 移除代码块标记 # 移除可能的代码块标记
if cleaned_str.startswith('```json'): if cleaned_str.startswith('```json'):
cleaned_str = cleaned_str[7:] cleaned_str = cleaned_str[7:]
if cleaned_str.endswith('```'): if cleaned_str.endswith('```'):
cleaned_str = cleaned_str[:-3] cleaned_str = cleaned_str[:-3]
# 尝试提取JSON内容 - 支持嵌套JSON结构
import re
# 首先尝试查找完整的JSON对象考虑嵌套结构
json_str = self._extract_complete_json(cleaned_str)
if json_str:
print(f"调试: 提取的完整JSON = {repr(json_str[:200])}...")
else:
json_str = cleaned_str.strip()
print(f"调试: 未找到JSON结构使用原始内容 = {repr(json_str[:200])}...")
try: try:
data_dict = json.loads(cleaned_str.strip()) data_dict = json.loads(json_str)
print(f"调试: 解析成功的字典 = {data_dict}")
if self.response_model: if self.response_model:
try: try:
return self.response_model(**data_dict) result = self.response_model(**data_dict)
print(f"调试: 成功创建模型实例 = {result}")
return result
except Exception as e: except Exception as e:
print(f"无法从字典创建模型实例: {e}") print(f"无法从字典创建模型实例: {e}")
return data_dict return None
else: else:
return data_dict return data_dict
except json.JSONDecodeError: except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}")
print(f"尝试解析的内容: {repr(json_str)}")
return None return None
def _extract_complete_json(self, text: str) -> str:
"""
从文本中提取完整的JSON对象支持嵌套结构
Args:
text: 包含JSON的文本
Returns:
提取的完整JSON字符串如果未找到则返回None
"""
# 查找第一个左大括号
start_idx = text.find('{')
if start_idx == -1:
return None
# 使用计数器匹配完整的JSON对象
brace_count = 0
in_string = False
escape_next = False
for i, char in enumerate(text[start_idx:], start_idx):
if escape_next:
escape_next = False
continue
if char == '\\' and in_string:
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
# 找到匹配的右大括号
if brace_count == 0:
return text[start_idx:i+1]
# 如果没有找到匹配的大括号,返回从第一个{到末尾
return text[start_idx:] if brace_count > 0 else None
def _run_unstructured(self, prompt: str, **kwargs) -> str: def _run_unstructured(self, prompt: str, **kwargs) -> str:
"""执行非结构化输出运行。 """执行非结构化输出运行。
@ -449,7 +515,7 @@ class BaseAgent:
RuntimeError: 如果无法获得有效的结构化响应 RuntimeError: 如果无法获得有效的结构化响应
""" """
tasks = { tasks = {
asyncio.create_task(self.agent.arun(prompt, output_class=self.response_model, **kwargs)) asyncio.create_task(self.agent.arun(prompt, **kwargs))
for _ in range(self.num_requests) for _ in range(self.num_requests)
} }
@ -477,7 +543,7 @@ class BaseAgent:
break break
# 等待下一个任务完成 # 等待下一个任务完成
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
raise RuntimeError(f"{self.num_requests} 次并行尝试后无法获得有效的结构化响应") raise RuntimeError(f"{self.num_requests} 次并行尝试后无法获得有效的结构化响应")
@ -504,7 +570,7 @@ class BaseAgent:
try: try:
# 等待第一个完成的任务 # 等待第一个完成的任务
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
first_task = done.pop() first_task = done.pop()
try: try: