Agent间通信(A2A)
构建多Agent协作系统的通信机制
📋 概述
Agent间通信(Agent-to-Agent Communication, A2A)是多Agent系统的核心。本文介绍A2A的通信模式、协议设计和实现方法。
🎯 A2A核心概念
为什么需要A2A?
单个Agent的能力有限,多个Agent协作可以:
- 分工合作,提高效率
- 组合专长,解决复杂问题
- 并行处理,加快速度
- 容错和冗余
🏗️ 通信架构模式
1. 点对点通信
class DirectCommunication:
"""点对点通信"""
def __init__(self):
self.agents = {}
def register_agent(self, agent):
self.agents[agent.name] = agent
def send_message(self, sender: str, receiver: str, message: dict):
"""发送消息"""
if receiver in self.agents:
self.agents[receiver].receive_message(sender, message)
class Agent:
def __init__(self, name: str, comm: DirectCommunication):
self.name = name
self.comm = comm
self.inbox = []
def send_to(self, receiver: str, message: dict):
"""发送消息"""
self.comm.send_message(self.name, receiver, message)
def receive_message(self, sender: str, message: dict):
"""接收消息"""
self.inbox.append({
"sender": sender,
"message": message,
"timestamp": datetime.now()
})
2. 消息总线
from typing import Callable, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Message:
sender: str
receiver: str
content: dict
message_type: str
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class MessageBus:
"""消息总线"""
def __init__(self):
self.subscribers = {}
self.message_history = []
def subscribe(self, agent_name: str, callback: Callable):
"""订阅消息"""
if agent_name not in self.subscribers:
self.subscribers[agent_name] = []
self.subscribers[agent_name].append(callback)
def publish(self, message: Message):
"""发布消息"""
# 记录历史
self.message_history.append(message)
# 路由消息
if message.receiver == "broadcast":
# 广播给所有人
for callbacks in self.subscribers.values():
for callback in callbacks:
callback(message)
elif message.receiver in self.subscribers:
# 发送给特定接收者
for callback in self.subscribers[message.receiver]:
callback(message)
def get_history(self, agent_name: str = None) -> List[Message]:
"""获取消息历史"""
if agent_name:
return [m for m in self.message_history
if m.sender == agent_name or m.receiver == agent_name]
return self.message_history
class BusAgent:
def __init__(self, name: str, bus: MessageBus):
self.name = name
self.bus = bus
self.inbox = []
# 订阅消息
self.bus.subscribe(self.name, self.on_message)
def send(self, receiver: str, content: dict, msg_type: str = "info"):
"""发送消息"""
message = Message(
sender=self.name,
receiver=receiver,
content=content,
message_type=msg_type
)
self.bus.publish(message)
def broadcast(self, content: dict, msg_type: str = "info"):
"""广播消息"""
self.send("broadcast", content, msg_type)
def on_message(self, message: Message):
"""处理接收到的消息"""
self.inbox.append(message)
# 根据消息类型处理
if message.message_type == "request":
self.handle_request(message)
elif message.message_type == "response":
self.handle_response(message)
elif message.message_type == "info":
self.handle_info(message)
def handle_request(self, message: Message):
"""处理请求"""
pass # 由子类实现
def handle_response(self, message: Message):
"""处理响应"""
pass
def handle_info(self, message: Message):
"""处理信息"""
pass
3. 协调者模式
class Coordinator:
"""协调者"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.agents = {}
self.task_queue = []
self.results = {}
def register_agent(self, agent: BusAgent, capabilities: List[str]):
"""注册Agent及其能力"""
self.agents[agent.name] = {
"agent": agent,
"capabilities": capabilities,
"busy": False
}
def assign_task(self, task: dict) -> str:
"""分配任务"""
task_id = str(uuid.uuid4())
task["id"] = task_id
# 找到合适的Agent
suitable_agent = self.find_agent_for_task(task)
if suitable_agent:
# 直接分配
self.send_task(suitable_agent, task)
else:
# 加入队列
self.task_queue.append(task)
return task_id
def find_agent_for_task(self, task: dict) -> str:
"""找到合适的Agent"""
required_capability = task.get("capability")
for agent_name, info in self.agents.items():
if (required_capability in info["capabilities"] and
not info["busy"]):
return agent_name
return None
def send_task(self, agent_name: str, task: dict):
"""发送任务"""
message = Message(
sender="coordinator",
receiver=agent_name,
content=task,
message_type="task"
)
self.agents[agent_name]["busy"] = True
self.bus.publish(message)
def handle_result(self, agent_name: str, result: dict):
"""处理结果"""
task_id = result.get("task_id")
self.results[task_id] = result
# 标记Agent为空闲
self.agents[agent_name]["busy"] = False
# 处理队列中的任务
if self.task_queue:
next_task = self.task_queue.pop(0)
suitable_agent = self.find_agent_for_task(next_task)
if suitable_agent:
self.send_task(suitable_agent, next_task)
💬 通信协议设计
消息格式
from enum import Enum
from typing import Any, Optional
class MessageType(Enum):
"""消息类型"""
REQUEST = "request" # 请求
RESPONSE = "response" # 响应
INFO = "info" # 信息
ERROR = "error" # 错误
TASK = "task" # 任务分配
RESULT = "result" # 任务结果
QUERY = "query" # 查询
ANSWER = "answer" # 答案
@dataclass
class StandardMessage:
"""标准消息格式"""
# 元数据
id: str
sender: str
receiver: str
timestamp: datetime
message_type: MessageType
# 内容
content: Any
# 可选字段
priority: int = 1 # 优先级(1-10)
expires_at: Optional[datetime] = None # 过期时间
reply_to: Optional[str] = None # 回复的消息ID
requires_ack: bool = False # 是否需要确认
metadata: dict = None # 额外元数据
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def to_dict(self) -> dict:
"""转换为字典"""
return {
"id": self.id,
"sender": self.sender,
"receiver": self.receiver,
"timestamp": self.timestamp.isoformat(),
"message_type": self.message_type.value,
"content": self.content,
"priority": self.priority,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"reply_to": self.reply_to,
"requires_ack": self.requires_ack,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: dict) -> 'StandardMessage':
"""从字典创建"""
return cls(
id=data["id"],
sender=data["sender"],
receiver=data["receiver"],
timestamp=datetime.fromisoformat(data["timestamp"]),
message_type=MessageType(data["message_type"]),
content=data["content"],
priority=data.get("priority", 1),
expires_at=datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None,
reply_to=data.get("reply_to"),
requires_ack=data.get("requires_ack", False),
metadata=data.get("metadata", {})
)
请求-响应模式
import asyncio
class RequestResponseAgent(BusAgent):
def __init__(self, name: str, bus: MessageBus):
super().__init__(name, bus)
self.pending_requests = {}
async def request(
self,
receiver: str,
content: dict,
timeout: float = 30.0
) -> dict:
"""发送请求并等待响应"""
request_id = str(uuid.uuid4())
# 创建future用于等待响应
future = asyncio.Future()
self.pending_requests[request_id] = future
# 发送请求
message = StandardMessage(
id=request_id,
sender=self.name,
receiver=receiver,
timestamp=datetime.now(),
message_type=MessageType.REQUEST,
content=content,
requires_ack=True
)
self.bus.publish(message)
# 等待响应(带超时)
try:
response = await asyncio.wait_for(future, timeout=timeout)
return response
except asyncio.TimeoutError:
del self.pending_requests[request_id]
raise TimeoutError(f"Request {request_id} timed out")
def handle_response(self, message: StandardMessage):
"""处理响应"""
request_id = message.reply_to
if request_id in self.pending_requests:
future = self.pending_requests[request_id]
future.set_result(message.content)
del self.pending_requests[request_id]
def send_response(self, request_msg: StandardMessage, content: dict):
"""发送响应"""
response = StandardMessage(
id=str(uuid.uuid4()),
sender=self.name,
receiver=request_msg.sender,
timestamp=datetime.now(),
message_type=MessageType.RESPONSE,
content=content,
reply_to=request_msg.id
)
self.bus.publish(response)
🤝 协作模式
1. Pipeline模式
class PipelineCoordinator:
"""流水线协调器"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.pipeline = []
def add_stage(self, agent: BusAgent):
"""添加阶段"""
self.pipeline.append(agent)
async def execute(self, initial_data: dict) -> dict:
"""执行流水线"""
data = initial_data
for agent in self.pipeline:
# 发送到当前阶段
result = await agent.process(data)
# 结果传递给下一阶段
data = result
return data
# 示例:文档处理流水线
# 加载 → 分块 → 向量化 → 存储
loader_agent = DocumentLoaderAgent("loader", bus)
chunker_agent = ChunkerAgent("chunker", bus)
embedder_agent = EmbedderAgent("embedder", bus)
storage_agent = StorageAgent("storage", bus)
pipeline = PipelineCoordinator(bus)
pipeline.add_stage(loader_agent)
pipeline.add_stage(chunker_agent)
pipeline.add_stage(embedder_agent)
pipeline.add_stage(storage_agent)
result = await pipeline.execute({"file_path": "document.pdf"})
2. 投票模式
class VotingCoordinator:
"""投票协调器"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.agents = []
def add_agent(self, agent: BusAgent):
self.agents.append(agent)
async def decide(self, question: dict) -> dict:
"""通过投票做决策"""
votes = []
# 收集所有Agent的意见
for agent in self.agents:
vote = await agent.vote(question)
votes.append({
"agent": agent.name,
"decision": vote["decision"],
"confidence": vote["confidence"],
"reasoning": vote["reasoning"]
})
# 统计投票
decision_counts = {}
weighted_votes = {}
for vote in votes:
decision = vote["decision"]
confidence = vote["confidence"]
decision_counts[decision] = decision_counts.get(decision, 0) + 1
weighted_votes[decision] = weighted_votes.get(decision, 0) + confidence
# 选择获胜决策
winner = max(weighted_votes.items(), key=lambda x: x[1])
return {
"decision": winner[0],
"votes": votes,
"vote_counts": decision_counts,
"total_confidence": winner[1]
}
3. 拍卖模式
class AuctionCoordinator:
"""拍卖协调器"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.agents = []
def add_agent(self, agent: BusAgent):
self.agents.append(agent)
async def auction_task(self, task: dict) -> tuple:
"""拍卖任务"""
bids = []
# 征集出价
for agent in self.agents:
bid = await agent.bid(task)
if bid:
bids.append({
"agent": agent,
"price": bid["price"],
"estimated_time": bid["estimated_time"],
"quality_score": bid["quality_score"]
})
if not bids:
return None, None
# 选择最佳出价(综合考虑价格、时间、质量)
best_bid = max(bids, key=lambda b: self.evaluate_bid(b))
return best_bid["agent"], best_bid
def evaluate_bid(self, bid: dict) -> float:
"""评估出价"""
# 综合评分(可自定义权重)
score = (
bid["quality_score"] * 0.5 -
bid["price"] * 0.3 -
bid["estimated_time"] * 0.2
)
return score
📊 监控和调试
消息追踪
class MessageTracer:
"""消息追踪器"""
def __init__(self):
self.traces = []
def trace(self, message: StandardMessage, event: str):
"""记录消息事件"""
self.traces.append({
"message_id": message.id,
"event": event, # sent, received, processed, error
"timestamp": datetime.now(),
"sender": message.sender,
"receiver": message.receiver,
"message_type": message.message_type.value
})
def get_message_trace(self, message_id: str) -> List[dict]:
"""获取特定消息的追踪"""
return [t for t in self.traces if t["message_id"] == message_id]
def get_conversation_trace(self, agent_a: str, agent_b: str) -> List[dict]:
"""获取两个Agent之间的对话追踪"""
return [t for t in self.traces
if (t["sender"] == agent_a and t["receiver"] == agent_b) or
(t["sender"] == agent_b and t["receiver"] == agent_a)]
💡 最佳实践
Do's ✅
- 定义清晰的消息格式
- 实现超时和重试机制
- 记录通信日志
- 处理消息丢失和乱序
- 实现优先级队列
- 监控通信性能
Don'ts ❌
- 循环依赖
- 无限递归调用
- 忽略错误处理
- 缺少超时控制
- 过度耦合
🔗 相关文章
最后更新:2024年12月22日