Agent系统设计与实现
构建智能AI Agent:从单一Agent到多Agent协作
📋 概述
Agent是能够感知环境、做出决策并执行行动的智能体。本文介绍如何设计和实现生产级的Agent系统,包括ReAct、Plan-and-Execute、多Agent协作等模式。
🎯 Agent核心概念
什么是Agent?
定义:Agent = LLM + 记忆 + 规划 + 工具
Agent = {
"大脑": LLM(推理和决策),
"记忆": 对话历史和知识,
"能力": 可调用的工具集,
"策略": 规划和执行方法
}
Agent vs 传统程序
| 特性 | 传统程序 | Agent |
|---|---|---|
| 逻辑 | 预定义规则 | 动态推理 |
| 任务处理 | 单一流程 | 多步规划 |
| 适应性 | 固定 | 灵活 |
| 工具使用 | 硬编码 | 智能选择 |
🏗️ Agent架构模式
1. ReAct模式
原理:Reasoning(推理) + Acting(行动)交替进行
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain.llms import OpenAI
class ReActAgent:
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.agent = create_react_agent(llm, tools, self.get_prompt())
self.executor = AgentExecutor(
agent=self.agent,
tools=tools,
verbose=True,
max_iterations=5
)
def get_prompt(self):
return """
你是一个智能助手,可以使用以下工具:
{tools}
工具格式:工具名称[参数]
请使用以下格式思考和行动:
Question: 用户的问题
Thought: 我需要思考如何解决
Action: 工具名称[参数]
Observation: 工具的返回结果
... (重复 Thought/Action/Observation 直到找到答案)
Thought: 我现在知道最终答案了
Final Answer: 最终答案
开始!
Question: {input}
Thought: {agent_scratchpad}
"""
def run(self, query: str):
return self.executor.invoke({"input": query})
# 定义工具
def search_web(query: str) -> str:
"""搜索网络"""
# 实现搜索逻辑
return f"搜索结果for {query}"
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return str(result)
except:
return "计算错误"
tools = [
Tool(name="Search", func=search_web, description="搜索网络信息"),
Tool(name="Calculator", func=calculate, description="进行数学计算")
]
# 使用
agent = ReActAgent(OpenAI(), tools)
result = agent.run("北京的人口是多少?将这个数字乘以2")
2. Plan-and-Execute模式
原理:先制定完整计划,再逐步执行
class PlanAndExecuteAgent:
def __init__(self, llm, tools):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
def run(self, task: str):
# 1. 制定计划
plan = self.create_plan(task)
print(f"计划:\n{plan}\n")
# 2. 执行计划
results = []
for step in plan['steps']:
print(f"执行步骤:{step['description']}")
result = self.execute_step(step)
results.append(result)
print(f"结果:{result}\n")
# 3. 整合结果
final_answer = self.synthesize(task, results)
return final_answer
def create_plan(self, task: str) -> dict:
"""制定计划"""
prompt = f"""
任务:{task}
可用工具:
{self.format_tools()}
请制定详细的执行计划,以JSON格式输出:
{{
"steps": [
{{
"step_number": 1,
"description": "步骤描述",
"tool": "工具名称",
"parameters": "参数"
}}
]
}}
"""
response = self.llm.generate(prompt)
return json.loads(response)
def execute_step(self, step: dict):
"""执行单个步骤"""
tool_name = step['tool']
parameters = step['parameters']
if tool_name in self.tools:
tool = self.tools[tool_name]
return tool.func(parameters)
else:
return f"工具 {tool_name} 不存在"
def synthesize(self, task: str, results: list) -> str:
"""整合结果"""
prompt = f"""
原始任务:{task}
执行结果:
{chr(10).join([f"{i+1}. {r}" for i, r in enumerate(results)])}
请基于这些结果,给出最终答案:
"""
return self.llm.generate(prompt)
3. Reflexion模式
原理:Agent能够反思和自我改进
class ReflexionAgent:
def __init__(self, llm, tools, max_retries=3):
self.llm = llm
self.tools = tools
self.max_retries = max_retries
def run(self, task: str):
for attempt in range(self.max_retries):
print(f"\n尝试 {attempt + 1}/{self.max_retries}")
# 执行任务
result = self.execute(task)
# 反思结果
reflection = self.reflect(task, result)
if reflection['is_correct']:
return result
# 基于反思改进
print(f"反思:{reflection['feedback']}")
task = self.improve_task(task, reflection)
return result
def reflect(self, task: str, result: str) -> dict:
"""反思执行结果"""
prompt = f"""
任务:{task}
执行结果:{result}
请评估这个结果:
1. 是否正确回答了问题?
2. 有什么问题或不足?
3. 如何改进?
以JSON格式输出:
{{
"is_correct": true/false,
"feedback": "反馈内容",
"improvement_suggestions": "改进建议"
}}
"""
response = self.llm.generate(prompt)
return json.loads(response)
🔧 工具集成
1. 工具定义
from typing import Callable, Optional
from pydantic import BaseModel, Field
class ToolParameter(BaseModel):
"""工具参数"""
name: str
type: str
description: str
required: bool = True
class Tool(BaseModel):
"""工具定义"""
name: str = Field(description="工具名称")
description: str = Field(description="工具功能描述")
parameters: list[ToolParameter] = Field(description="参数列表")
function: Callable = Field(description="实际执行的函数")
def execute(self, **kwargs):
"""执行工具"""
try:
return self.function(**kwargs)
except Exception as e:
return f"执行错误:{str(e)}"
def get_schema(self) -> dict:
"""获取工具schema(用于LLM理解)"""
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
param.name: {
"type": param.type,
"description": param.description
}
for param in self.parameters
},
"required": [p.name for p in self.parameters if p.required]
}
}
2. 常用工具实现
# 搜索工具
def create_search_tool():
def search(query: str) -> str:
"""搜索网络信息"""
# 使用 SerpAPI 或其他搜索API
import requests
response = requests.get(
"https://api.serpapi.com/search",
params={
"q": query,
"api_key": "YOUR_API_KEY"
}
)
results = response.json()
# 格式化结果
return format_search_results(results)
return Tool(
name="search",
description="搜索互联网信息",
parameters=[
ToolParameter(
name="query",
type="string",
description="搜索查询"
)
],
function=search
)
# 计算器工具
def create_calculator_tool():
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
# 安全的数学计算
import ast
import operator
ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow
}
def eval_expr(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
return ops[type(node.op)](
eval_expr(node.left),
eval_expr(node.right)
)
else:
raise TypeError(node)
return str(eval_expr(ast.parse(expression).body[0].value))
except:
return "计算错误"
return Tool(
name="calculator",
description="计算数学表达式",
parameters=[
ToolParameter(
name="expression",
type="string",
description="数学表达式,如:2+3*4"
)
],
function=calculate
)
# 数据库查询工具
def create_database_tool(db_config):
def query_database(sql: str) -> str:
"""查询数据库"""
import sqlite3
conn = sqlite3.connect(db_config['database'])
cursor = conn.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
return str(results)
except Exception as e:
return f"查询错误:{str(e)}"
finally:
conn.close()
return Tool(
name="database",
description="查询数据库",
parameters=[
ToolParameter(
name="sql",
type="string",
description="SQL查询语句"
)
],
function=query_database
)
# Python代码执行工具
def create_python_tool():
def execute_python(code: str) -> str:
"""执行Python代码"""
import sys
from io import StringIO
# 捕获输出
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
exec(code)
output = sys.stdout.getvalue()
return output
except Exception as e:
return f"执行错误:{str(e)}"
finally:
sys.stdout = old_stdout
return Tool(
name="python",
description="执行Python代码",
parameters=[
ToolParameter(
name="code",
type="string",
description="要执行的Python代码"
)
],
function=execute_python
)
🤝 多Agent协作
1. 角色分工模式
class MultiAgentSystem:
def __init__(self, llm):
self.llm = llm
self.agents = {
"researcher": ResearchAgent(llm),
"writer": WriterAgent(llm),
"critic": CriticAgent(llm)
}
def solve(self, task: str):
# 1. 研究员收集信息
research_results = self.agents["researcher"].research(task)
# 2. 写作者生成内容
draft = self.agents["writer"].write(task, research_results)
# 3. 评论家review并提出改进
feedback = self.agents["critic"].critique(draft)
# 4. 写作者根据反馈修改
final_version = self.agents["writer"].revise(draft, feedback)
return final_version
class ResearchAgent:
def __init__(self, llm):
self.llm = llm
self.tools = [create_search_tool()]
def research(self, topic: str) -> dict:
"""研究主题"""
prompt = f"""
作为研究员,请收集关于以下主题的信息:
主题:{topic}
使用搜索工具收集:
1. 主要事实和数据
2. 相关案例
3. 专家观点
研究报告:
"""
# 使用工具收集信息
results = {}
# ... 执行搜索
return results
class WriterAgent:
def __init__(self, llm):
self.llm = llm
def write(self, topic: str, research: dict) -> str:
"""撰写内容"""
prompt = f"""
作为内容作者,基于以下研究结果撰写文章:
主题:{topic}
研究结果:{research}
要求:
- 结构清晰
- 论据充分
- 语言流畅
文章:
"""
return self.llm.generate(prompt)
def revise(self, draft: str, feedback: dict) -> str:
"""修改内容"""
prompt = f"""
基于以下反馈修改文章:
原稿:{draft}
反馈:{feedback}
修改后的文章:
"""
return self.llm.generate(prompt)
class CriticAgent:
def __init__(self, llm):
self.llm = llm
def critique(self, content: str) -> dict:
"""评论内容"""
prompt = f"""
作为评论家,请review以下内容:
内容:{content}
从以下维度评分(1-10):
1. 逻辑性
2. 准确性
3. 可读性
4. 完整性
并提出具体的改进建议。
评论(JSON格式):
"""
response = self.llm.generate(prompt)
return json.loads(response)
2. 协作通信模式
class Message:
"""Agent间的消息"""
def __init__(
self,
sender: str,
receiver: str,
content: str,
message_type: str = "info"
):
self.sender = sender
self.receiver = receiver
self.content = content
self.message_type = message_type
self.timestamp = datetime.now()
class MessageBus:
"""消息总线"""
def __init__(self):
self.messages = []
self.subscribers = {}
def subscribe(self, agent_name: str, callback: Callable):
"""订阅消息"""
if agent_name not in self.subscribers:
self.subscribers[agent_name] = []
self.subscribers[agent_name].append(callback)
def publish(self, message: Message):
"""发布消息"""
self.messages.append(message)
# 通知接收者
if message.receiver in self.subscribers:
for callback in self.subscribers[message.receiver]:
callback(message)
# 广播消息
if message.receiver == "all":
for callbacks in self.subscribers.values():
for callback in callbacks:
callback(message)
class CollaborativeAgent:
def __init__(self, name: str, llm, message_bus: MessageBus):
self.name = name
self.llm = llm
self.bus = message_bus
# 订阅消息
self.bus.subscribe(self.name, self.handle_message)
def send_message(self, receiver: str, content: str, msg_type: str = "info"):
"""发送消息"""
message = Message(self.name, receiver, content, msg_type)
self.bus.publish(message)
def handle_message(self, message: Message):
"""处理接收到的消息"""
if message.message_type == "request":
# 处理请求并回复
response = self.process_request(message.content)
self.send_message(message.sender, response, "response")
elif message.message_type == "info":
# 处理信息
self.process_info(message.content)
📊 Agent评估与优化
1. 性能指标
class AgentMetrics:
def __init__(self):
self.metrics = {
'success_rate': [],
'execution_time': [],
'tool_usage': [],
'iterations': []
}
def record(self, result: dict):
"""记录执行结果"""
self.metrics['success_rate'].append(result['success'])
self.metrics['execution_time'].append(result['time'])
self.metrics['tool_usage'].append(result['tools_used'])
self.metrics['iterations'].append(result['iterations'])
def get_stats(self) -> dict:
"""获取统计数据"""
return {
'success_rate': np.mean(self.metrics['success_rate']),
'avg_time': np.mean(self.metrics['execution_time']),
'avg_iterations': np.mean(self.metrics['iterations']),
'most_used_tools': Counter(
[t for tools in self.metrics['tool_usage'] for t in tools]
).most_common(5)
}
2. A/B测试
def compare_agents(agent_a, agent_b, test_cases):
"""对比两个Agent"""
results_a = []
results_b = []
for test_case in test_cases:
# 测试Agent A
result_a = agent_a.run(test_case['query'])
score_a = evaluate_result(result_a, test_case['expected'])
results_a.append(score_a)
# 测试Agent B
result_b = agent_b.run(test_case['query'])
score_b = evaluate_result(result_b, test_case['expected'])
results_b.append(score_b)
return {
'agent_a_avg': np.mean(results_a),
'agent_b_avg': np.mean(results_b),
'winner': 'A' if np.mean(results_a) > np.mean(results_b) else 'B'
}
💡 最佳实践
Do's ✅
- 明确Agent的角色和能力边界
- 设计合理的工具集
- 实现错误处理和重试机制
- 记录执行过程便于调试
- 设置最大迭代次数防止死循环
- 使用结构化输出
- 定期评估和优化
Don'ts ❌
- 工具描述不清晰
- 缺少执行限制
- 忽视安全性
- 过度依赖单一工具
- 没有失败处理
- 缺少监控和日志
🔗 相关文章
最后更新:2024年12月22日