MedResearcher/example/subagent-example/text_analysis_example.py
iomgaa 1b652502d5 docs: 新增SubAgent系统完整示例和说明文档
- 添加详细的SubAgent使用指南(README.md)
- 创建完整的Pydantic模型示例(example_models.py)
- 实现基础使用示例,展示核心功能(basic_example.py)
- 构建复杂文本分析应用示例(text_analysis_example.py)
- 提供数字提取实验运行器作为参考示例
- 包含多Agent协作、批量处理、性能监控等高级功能
- 支持交互式演示和完整的错误处理机制
2025-08-25 17:33:20 +08:00

762 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
文本分析综合示例
基于SubAgent系统的复杂应用示例展示
1. 多Agent协作系统
2. 复杂的数据处理pipeline
3. 结构化输出和错误恢复
4. 性能监控和质量评估
"""
import sys
import os
import time
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
import uuid
# 添加项目根路径到Python路径
project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(project_root)
from src.agent_system import SubAgent, create_json_agent
from example_models import (
TextAnalysisResult,
DocumentClassificationResult,
StructuredDataExtraction,
ComprehensiveAnalysisResult,
SentimentAnalysis,
KeywordExtraction,
DataExtractionItem,
CategoryClassification,
TaskExecutionResult
)
class TextAnalysisEngine:
"""文本分析引擎 - 多Agent协作系统"""
def __init__(self):
"""初始化文本分析引擎"""
self.agents = {}
self.processing_stats = {
"total_processed": 0,
"successful_analyses": 0,
"failed_analyses": 0,
"average_processing_time": 0.0
}
# 初始化所有Agent
self._initialize_agents()
def _initialize_agents(self):
"""初始化所有分析Agent"""
print("🔧 初始化文本分析引擎...")
try:
# 1. 情感分析Agent
self.agents['sentiment'] = self._create_sentiment_agent()
# 2. 关键词提取Agent
self.agents['keywords'] = self._create_keyword_agent()
# 3. 文本分类Agent
self.agents['classification'] = self._create_classification_agent()
# 4. 数据提取Agent
self.agents['extraction'] = self._create_extraction_agent()
# 5. 综合分析Agent
self.agents['comprehensive'] = self._create_comprehensive_agent()
print(f"✅ 成功初始化 {len(self.agents)} 个Agent")
except Exception as e:
print(f"❌ Agent初始化失败: {e}")
raise
def _create_sentiment_agent(self) -> SubAgent:
"""创建情感分析Agent"""
instructions = [
"你是专业的文本情感分析专家",
"准确识别文本的情感倾向和情感强度",
"提供详细的分析依据和相关关键词",
"对分析结果给出可信度评估"
]
prompt_template = """
请对以下文本进行深入的情感分析:
【文本内容】
{text}
【分析要求】
1. 识别主要情感倾向positive/negative/neutral
2. 评估情感强度和置信度0-1
3. 提供分析说明和判断依据
4. 提取影响情感判断的关键词和短语
5. 考虑语言的细微差别和上下文含义
请提供准确、专业的分析结果。
"""
return SubAgent(
provider="aliyun",
model_name="qwen-max",
name="sentiment_analyzer",
description="专业的情感分析系统",
instructions=instructions,
prompt_template=prompt_template,
response_model=SentimentAnalysis
)
def _create_keyword_agent(self) -> SubAgent:
"""创建关键词提取Agent"""
instructions = [
"你是专业的关键词提取专家",
"从文本中识别最重要和最相关的关键词",
"评估关键词的重要性和频率",
"对关键词进行合理的分类"
]
prompt_template = """
请从以下文本中提取关键词:
【文本内容】
{text}
【提取要求】
1. 识别最重要的关键词和短语
2. 统计关键词出现频率
3. 评估每个关键词的重要性0-1
4. 对关键词进行分类(如:人物、地点、概念、技术等)
5. 排除停用词和无意义词汇
请提供结构化的关键词提取结果。
"""
# 创建专门处理关键词列表的响应模型
from pydantic import BaseModel, Field
from typing import List
class KeywordExtractionResult(BaseModel):
keywords: List[KeywordExtraction] = Field(description="提取的关键词列表")
total_count: int = Field(description="关键词总数", ge=0)
text_complexity: float = Field(description="文本复杂度", ge=0.0, le=1.0)
return SubAgent(
provider="aliyun",
model_name="qwen-max",
name="keyword_extractor",
description="智能关键词提取系统",
instructions=instructions,
prompt_template=prompt_template,
response_model=KeywordExtractionResult
)
def _create_classification_agent(self) -> SubAgent:
"""创建文档分类Agent"""
instructions = [
"你是专业的文档分类专家",
"准确识别文档的类型和主题",
"提供多级分类和置信度评估",
"考虑文档的内容、风格和用途"
]
prompt_template = """
请对以下文档进行分类:
【文档内容】
{text}
【分类体系】
主要分类:技术文档、商业文档、学术论文、新闻报道、个人写作、法律文档、医学文档等
详细分类:根据具体内容进一步细分
【分类要求】
1. 确定主要分类和置信度
2. 提供所有可能分类的概率分布
3. 识别用于分类判断的关键特征
4. 评估分类的可信度
请提供准确的分类结果。
"""
return SubAgent(
provider="aliyun",
model_name="qwen-max",
name="document_classifier",
description="智能文档分类系统",
instructions=instructions,
prompt_template=prompt_template,
response_model=DocumentClassificationResult
)
def _create_extraction_agent(self) -> SubAgent:
"""创建数据提取Agent"""
instructions = [
"你是专业的结构化数据提取专家",
"从非结构化文本中提取有价值的信息",
"确保提取的数据准确性和完整性",
"评估提取质量和可靠性"
]
prompt_template = """
请从以下文本中提取结构化数据:
【文本内容】
{text}
【提取目标】
根据文本内容自动识别可提取的数据类型,可能包括:
- 人名、地名、机构名
- 日期、时间、数量
- 联系方式、地址
- 专业术语、概念
- 关键指标、统计数据
【提取要求】
1. 自动识别文本中的结构化信息
2. 为每个提取项提供置信度评估
3. 记录提取依据和来源文本片段
4. 评估整体提取质量和完整性
请提供详细的数据提取结果。
"""
return SubAgent(
provider="aliyun",
model_name="qwen-max",
name="data_extractor",
description="智能数据提取系统",
instructions=instructions,
prompt_template=prompt_template,
response_model=StructuredDataExtraction
)
def _create_comprehensive_agent(self) -> SubAgent:
"""创建综合分析Agent"""
instructions = [
"你是文本综合分析专家",
"整合多种分析结果提供整体评估",
"识别分析中的一致性和矛盾之处",
"提供改进建议和深度见解"
]
prompt_template = """
基于以下多维度分析结果,请提供综合评估:
【原始文本】
{original_text}
【分析结果】
情感分析: {sentiment_result}
关键词提取: {keyword_result}
文档分类: {classification_result}
数据提取: {extraction_result}
【综合评估要求】
1. 评估各项分析结果的一致性
2. 识别潜在的分析矛盾或问题
3. 提供整体质量评估
4. 给出置信度评估
5. 提出改进建议
请提供专业的综合分析报告。
"""
return SubAgent(
provider="aliyun",
model_name="qwen-max",
name="comprehensive_analyzer",
description="文本综合分析系统",
instructions=instructions,
prompt_template=prompt_template,
response_model=ComprehensiveAnalysisResult
)
def analyze_text(self, text: str, analysis_id: Optional[str] = None) -> ComprehensiveAnalysisResult:
"""执行完整的文本分析"""
if analysis_id is None:
analysis_id = f"analysis_{uuid.uuid4().hex[:8]}"
start_time = time.time()
self.processing_stats["total_processed"] += 1
print(f"\n🔍 开始分析 [{analysis_id}]")
print(f"文本长度: {len(text)} 字符")
try:
# 阶段1: 基础分析
print("📊 执行基础分析...")
sentiment_result = self._analyze_sentiment(text)
keyword_result = self._extract_keywords(text)
# 阶段2: 高级分析
print("🧠 执行高级分析...")
classification_result = self._classify_document(text)
extraction_result = self._extract_data(text)
# 阶段3: 综合分析
print("🎯 执行综合分析...")
comprehensive_result = self._comprehensive_analysis(
text, sentiment_result, keyword_result,
classification_result, extraction_result, analysis_id
)
# 更新统计信息
processing_time = time.time() - start_time
self.processing_stats["successful_analyses"] += 1
self._update_processing_stats(processing_time)
print(f"✅ 分析完成 [{analysis_id}] - 耗时: {processing_time:.2f}")
return comprehensive_result
except Exception as e:
self.processing_stats["failed_analyses"] += 1
print(f"❌ 分析失败 [{analysis_id}]: {e}")
raise
def _analyze_sentiment(self, text: str) -> SentimentAnalysis:
"""执行情感分析"""
try:
result = self.agents['sentiment'].run(template_vars={"text": text})
print(f" 情感: {result.sentiment} (置信度: {result.confidence:.3f})")
return result
except Exception as e:
print(f" ⚠️ 情感分析失败: {e}")
# 返回默认结果
return SentimentAnalysis(
sentiment="neutral",
confidence=0.0,
explanation=f"分析失败: {e}",
keywords=[]
)
def _extract_keywords(self, text: str):
"""提取关键词"""
try:
result = self.agents['keywords'].run(template_vars={"text": text})
print(f" 关键词: {result.total_count}")
return result
except Exception as e:
print(f" ⚠️ 关键词提取失败: {e}")
# 返回空结果
from pydantic import BaseModel, Field
from typing import List
class KeywordExtractionResult(BaseModel):
keywords: List[KeywordExtraction] = Field(default_factory=list)
total_count: int = Field(default=0)
text_complexity: float = Field(default=0.5)
return KeywordExtractionResult()
def _classify_document(self, text: str) -> DocumentClassificationResult:
"""执行文档分类"""
try:
result = self.agents['classification'].run(template_vars={"text": text})
print(f" 分类: {result.primary_category} (置信度: {result.confidence:.3f})")
return result
except Exception as e:
print(f" ⚠️ 文档分类失败: {e}")
# 返回默认结果
return DocumentClassificationResult(
primary_category="未知",
confidence=0.0,
all_categories=[
CategoryClassification(
category="未知",
confidence=0.0,
probability=0.0
)
]
)
def _extract_data(self, text: str) -> StructuredDataExtraction:
"""提取结构化数据"""
try:
result = self.agents['extraction'].run(template_vars={"text": text})
print(f" 数据提取: {result.extracted_fields}/{result.total_fields} 字段")
return result
except Exception as e:
print(f" ⚠️ 数据提取失败: {e}")
# 返回空结果
return StructuredDataExtraction(
extracted_data={},
extraction_items=[],
extraction_quality="poor",
completeness=0.0,
accuracy=0.0,
total_fields=0,
extracted_fields=0,
failed_fields=0
)
def _comprehensive_analysis(
self,
original_text: str,
sentiment_result: SentimentAnalysis,
keyword_result,
classification_result: DocumentClassificationResult,
extraction_result: StructuredDataExtraction,
analysis_id: str
) -> ComprehensiveAnalysisResult:
"""执行综合分析"""
try:
# 准备模板变量
template_vars = {
"original_text": original_text[:500] + ("..." if len(original_text) > 500 else ""),
"sentiment_result": f"情感:{sentiment_result.sentiment}, 置信度:{sentiment_result.confidence}",
"keyword_result": f"关键词数量:{getattr(keyword_result, 'total_count', 0)}",
"classification_result": f"分类:{classification_result.primary_category}, 置信度:{classification_result.confidence}",
"extraction_result": f"提取质量:{extraction_result.extraction_quality}"
}
result = self.agents['comprehensive'].run(template_vars=template_vars)
# 补充一些字段
result.analysis_id = analysis_id
result.input_summary = f"长度:{len(original_text)}字符, 类型:{classification_result.primary_category}"
result.text_analysis = self._build_text_analysis_result(
original_text, sentiment_result, keyword_result
)
result.classification = classification_result
result.data_extraction = extraction_result
print(f" 综合评估: {result.overall_quality}")
return result
except Exception as e:
print(f" ⚠️ 综合分析失败: {e}")
# 构建基本的综合结果
return ComprehensiveAnalysisResult(
analysis_id=analysis_id,
input_summary=f"长度:{len(original_text)}字符",
overall_quality="poor",
confidence_level=0.0,
total_processing_time=0.0,
components_completed=0,
components_failed=4,
recommendations=["分析失败,请检查输入文本和系统配置"]
)
def _build_text_analysis_result(
self,
text: str,
sentiment: SentimentAnalysis,
keyword_result
) -> TextAnalysisResult:
"""构建文本分析结果"""
# 获取关键词列表
keywords = getattr(keyword_result, 'keywords', [])
return TextAnalysisResult(
text_length=len(text),
word_count=len(text.split()),
language="zh",
summary=f"文本分析摘要: 情感倾向为{sentiment.sentiment}",
sentiment=sentiment,
keywords=keywords,
readability="medium",
complexity=getattr(keyword_result, 'text_complexity', 0.5)
)
def _update_processing_stats(self, processing_time: float):
"""更新处理统计信息"""
total = self.processing_stats["total_processed"]
current_avg = self.processing_stats["average_processing_time"]
# 计算新的平均处理时间
new_avg = ((current_avg * (total - 1)) + processing_time) / total
self.processing_stats["average_processing_time"] = new_avg
def get_processing_stats(self) -> Dict[str, Any]:
"""获取处理统计信息"""
return self.processing_stats.copy()
def display_stats(self):
"""显示处理统计"""
stats = self.get_processing_stats()
print("\n📈 处理统计信息:")
print(f" 总处理数: {stats['total_processed']}")
print(f" 成功数: {stats['successful_analyses']}")
print(f" 失败数: {stats['failed_analyses']}")
if stats['total_processed'] > 0:
success_rate = stats['successful_analyses'] / stats['total_processed'] * 100
print(f" 成功率: {success_rate:.1f}%")
print(f" 平均处理时间: {stats['average_processing_time']:.2f}")
def demo_single_text_analysis():
"""演示单个文本分析"""
print("🔍 单文本分析演示")
print("="*50)
# 创建分析引擎
engine = TextAnalysisEngine()
# 测试文本
test_text = """
人工智能技术正在快速发展,深度学习和机器学习算法在各个领域都取得了显著的进展。
从自然语言处理到计算机视觉从推荐系统到自动驾驶AI技术正在改变我们的生活方式。
然而我们也需要关注AI发展带来的挑战包括隐私保护、算法偏见、就业影响等问题。
只有在技术发展和社会责任之间找到平衡AI才能真正造福人类社会。
总的来说,人工智能的未来充满希望,但也需要我们谨慎对待,确保技术发展的方向符合人类的长远利益。
"""
try:
# 执行分析
result = engine.analyze_text(test_text)
# 显示详细结果
display_analysis_result(result)
# 显示统计信息
engine.display_stats()
return True
except Exception as e:
print(f"❌ 演示失败: {e}")
return False
def demo_batch_analysis():
"""演示批量文本分析"""
print("\n🔄 批量文本分析演示")
print("="*50)
# 创建分析引擎
engine = TextAnalysisEngine()
# 测试文本集合
test_texts = [
"今天天气真好,阳光明媚,心情特别愉快!",
"公司最新发布的季度财报显示营收同比增长15%净利润达到2.3亿元。董事会决定向股东分红每股0.5元。",
"机器学习是人工智能的一个重要分支,通过算法让计算机能够从数据中学习模式。常见的机器学习算法包括线性回归、决策树、神经网络等。",
"服务态度恶劣,产品质量很差,完全不值这个价格。强烈不推荐大家购买!",
"根据《合同法》第一百二十一条规定,当事人一方因第三人的原因造成违约的,应当向对方承担违约责任。"
]
results = []
start_time = time.time()
for i, text in enumerate(test_texts, 1):
print(f"\n处理第 {i}/{len(test_texts)} 个文本...")
try:
result = engine.analyze_text(text, f"batch_{i}")
results.append(result)
# 显示简要结果
print(f" 结果: {result.overall_quality} | 置信度: {result.confidence_level:.3f}")
except Exception as e:
print(f" ❌ 处理失败: {e}")
results.append(None)
total_time = time.time() - start_time
# 显示批量处理总结
print(f"\n📊 批量处理完成 - 总耗时: {total_time:.2f}")
engine.display_stats()
# 显示成功处理的结果统计
successful_results = [r for r in results if r is not None]
if successful_results:
print(f"\n🎯 成功处理 {len(successful_results)} 个文本:")
quality_stats = {}
for result in successful_results:
quality = result.overall_quality
quality_stats[quality] = quality_stats.get(quality, 0) + 1
for quality, count in quality_stats.items():
print(f" {quality}: {count}")
return len(successful_results) > 0
def display_analysis_result(result: ComprehensiveAnalysisResult):
"""显示详细的分析结果"""
print(f"\n📋 详细分析结果 [{result.analysis_id}]")
print("="*60)
print(f"输入摘要: {result.input_summary}")
print(f"分析时间: {result.analysis_timestamp}")
print(f"整体质量: {result.overall_quality}")
print(f"置信度: {result.confidence_level:.3f}")
print(f"处理时间: {result.total_processing_time:.2f}")
# 文本分析结果
if result.text_analysis:
ta = result.text_analysis
print(f"\n📝 文本分析:")
print(f" 长度: {ta.text_length} 字符")
print(f" 词数: {ta.word_count}")
print(f" 摘要: {ta.summary}")
print(f" 情感: {ta.sentiment.sentiment} (置信度: {ta.sentiment.confidence:.3f})")
print(f" 可读性: {ta.readability}")
if ta.keywords:
top_keywords = ta.keywords[:5]
print(f" 关键词: {[k.keyword for k in top_keywords]}")
# 分类结果
if result.classification:
cls = result.classification
print(f"\n🏷️ 文档分类:")
print(f" 主分类: {cls.primary_category}")
print(f" 置信度: {cls.confidence:.3f}")
if len(cls.all_categories) > 1:
other_cats = cls.all_categories[1:3]
print(f" 其他可能: {[c.category for c in other_cats]}")
# 数据提取结果
if result.data_extraction:
de = result.data_extraction
print(f"\n🔍 数据提取:")
print(f" 质量: {de.extraction_quality}")
print(f" 完整性: {de.completeness:.3f}")
print(f" 准确性: {de.accuracy:.3f}")
print(f" 字段统计: {de.extracted_fields}/{de.total_fields}")
if de.extraction_items:
print(" 提取项目:")
for item in de.extraction_items[:3]: # 显示前3个
print(f" {item.field_name}: {item.field_value} (置信度: {item.confidence:.3f})")
# 改进建议
if result.recommendations:
print(f"\n💡 改进建议:")
for i, rec in enumerate(result.recommendations[:3], 1):
print(f" {i}. {rec}")
def interactive_analysis():
"""交互式分析功能"""
print("\n💬 交互式文本分析")
print("="*50)
print("输入文本进行综合分析,输入'quit'退出")
try:
engine = TextAnalysisEngine()
while True:
print("\n" + "-"*30)
user_input = input("请输入要分析的文本: ").strip()
if user_input.lower() == 'quit':
print("分析结束,再见!")
break
if not user_input:
continue
if len(user_input) < 10:
print("⚠️ 文本太短请输入至少10个字符")
continue
try:
result = engine.analyze_text(user_input)
display_analysis_result(result)
except Exception as e:
print(f"❌ 分析失败: {e}")
# 显示最终统计
engine.display_stats()
except KeyboardInterrupt:
print("\n程序已中断")
except Exception as e:
print(f"❌ 交互式分析失败: {e}")
def test_engine_initialization():
"""测试引擎初始化"""
print("正在测试文本分析引擎初始化...")
try:
engine = TextAnalysisEngine()
print(f"✅ 引擎初始化成功,包含 {len(engine.agents)} 个Agent")
# 显示Agent信息
for name, agent in engine.agents.items():
info = agent.get_model_info()
print(f" {name}: {info['model_name']} ({info['provider']})")
return True
except Exception as e:
print(f"❌ 引擎初始化失败: {e}")
return False
def main():
"""主函数"""
print("🚀 文本分析综合示例")
print("="*60)
# 运行各种演示
demos = [
("引擎初始化测试", test_engine_initialization),
("单文本分析", demo_single_text_analysis),
("批量文本分析", demo_batch_analysis),
]
results = {}
for name, demo_func in demos:
print(f"\n开始: {name}")
try:
success = demo_func()
results[name] = success
print(f"{'' if success else ''} {name} {'成功' if success else '失败'}")
except Exception as e:
print(f"{name} 异常: {e}")
results[name] = False
# 显示总结
print(f"\n📊 演示总结")
print("="*60)
successful_demos = sum(results.values())
total_demos = len(results)
for name, success in results.items():
status = "✅ 成功" if success else "❌ 失败"
print(f" {name}: {status}")
print(f"\n🎯 总计: {successful_demos}/{total_demos} 个演示成功")
# 询问是否运行交互式演示
if successful_demos > 0:
try:
choice = input("\n是否运行交互式分析?(y/n): ").strip().lower()
if choice in ['y', 'yes', '']:
interactive_analysis()
except (KeyboardInterrupt, EOFError):
print("\n程序结束")
return successful_demos == total_demos
if __name__ == "__main__":
# 可以选择运行测试或完整演示
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--test":
# 仅运行初始化测试
success = test_engine_initialization()
exit(0 if success else 1)
else:
# 运行完整演示
main()