Agent系统架构设计
本文详细介绍如何设计和实现一个生产级的 AI Agent 系统架构。
架构概览
完整的 Agent 系统架构包含多个层次:
graph TB
A[用户交互层] --> B[Agent编排层]
B --> C[推理引擎]
B --> D[工具调用层]
B --> E[记忆管理]
C --> F[LLM服务]
D --> G[工具库]
E --> H[向量存储]
E --> I[关系数据库]
B --> J[监控层]
1. 核心架构模式
单Agent架构
最基础的 Agent 架构:
class SingleAgent:
def __init__(self, llm, tools, memory):
self.llm = llm
self.tools = tools
self.memory = memory
self.prompt_template = PromptTemplate()
def run(self, task):
"""执行任务"""
# 初始化上下文
context = self.memory.load_context(task)
# 迭代执行
max_iterations = 10
for i in range(max_iterations):
# 生成思考和行动
thought_action = self.think_and_act(context)
# 检查是否完成
if thought_action.is_final:
return thought_action.result
# 执行工具
observation = self.execute_tool(thought_action.action)
# 更新上下文
context.append({
'thought': thought_action.thought,
'action': thought_action.action,
'observation': observation
})
# 保存到记忆
self.memory.save(context)
return "任务未完成,达到最大迭代次数"
def think_and_act(self, context):
"""思考并决定行动"""
prompt = self.prompt_template.format(context)
response = self.llm.generate(prompt)
return self.parse_response(response)
多Agent架构
多个 Agent 协作完成复杂任务:
class MultiAgentSystem:
def __init__(self):
self.agents = {}
self.coordinator = Coordinator()
self.message_bus = MessageBus()
def register_agent(self, name, agent):
"""注册Agent"""
self.agents[name] = agent
agent.set_message_bus(self.message_bus)
def solve(self, task):
"""多Agent协作解决任务"""
# 任务分解
subtasks = self.coordinator.decompose(task)
# 分配任务
assignments = self.coordinator.assign(subtasks, self.agents)
# 并行执行
results = {}
for agent_name, subtask in assignments.items():
agent = self.agents[agent_name]
result = agent.execute(subtask)
results[agent_name] = result
# 结果聚合
final_result = self.coordinator.aggregate(results)
return final_result
2. 推理引擎设计
ReAct推理引擎
结合推理和行动的引擎:
class ReActEngine:
def __init__(self, llm, max_steps=10):
self.llm = llm
self.max_steps = max_steps
self.prompt = """
你是一个解决问题的AI助手。你需要通过思考和行动来解决问题。
可用工具:
{tools}
使用以下格式:
Thought: 你应该思考要做什么
Action: 要执行的动作
Action Input: 动作的输入
Observation: 动作的结果
... (重复 Thought/Action/Observation)
Thought: 我现在知道最终答案了
Final Answer: 最终答案
开始!
Question: {question}
{scratchpad}
"""
def solve(self, question, tools):
"""使用ReAct模式解决问题"""
scratchpad = ""
for step in range(self.max_steps):
# 生成思考和行动
prompt = self.prompt.format(
tools=self.format_tools(tools),
question=question,
scratchpad=scratchpad
)
response = self.llm.generate(prompt)
# 解析响应
if "Final Answer:" in response:
answer = self.extract_final_answer(response)
return answer
thought, action, action_input = self.parse_step(response)
# 执行动作
observation = self.execute_action(
action, action_input, tools
)
# 更新scratchpad
scratchpad += f"""
Thought: {thought}
Action: {action}
Action Input: {action_input}
Observation: {observation}
"""
return "无法在规定步数内完成任务"
Plan-and-Execute引擎
先规划再执行的引擎:
class PlanExecuteEngine:
def __init__(self, planner_llm, executor_llm):
self.planner = Planner(planner_llm)
self.executor = Executor(executor_llm)
def solve(self, objective, tools):
"""规划并执行"""
# 第一阶段:制定计划
plan = self.planner.create_plan(objective, tools)
print(f"计划: {plan}")
# 第二阶段:执行计划
results = []
for step in plan.steps:
print(f"\n执行步骤: {step.description}")
# 执行当前步骤
result = self.executor.execute_step(
step, tools, results
)
results.append(result)
# 检查是否需要重新规划
if not result.success:
print("步骤失败,重新规划...")
remaining_objective = self.formulate_remaining(
objective, results
)
plan = self.planner.create_plan(
remaining_objective, tools
)
# 综合结果
final_answer = self.synthesize_results(results)
return final_answer
3. 工具系统设计
工具定义
from pydantic import BaseModel, Field
class ToolDefinition(BaseModel):
"""工具定义"""
name: str = Field(description="工具名称")
description: str = Field(description="工具描述")
parameters: dict = Field(description="参数schema")
def to_prompt(self):
"""转换为prompt格式"""
return f"{self.name}: {self.description}\n参数: {self.parameters}"
class Tool:
"""工具基类"""
def __init__(self, name, description, func):
self.name = name
self.description = description
self.func = func
def run(self, *args, **kwargs):
"""执行工具"""
try:
result = self.func(*args, **kwargs)
return {
'success': True,
'result': result
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
工具注册表
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools = {}
def register(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
def get(self, name: str) -> Tool:
"""获取工具"""
if name not in self.tools:
raise ValueError(f"工具 {name} 不存在")
return self.tools[name]
def list_tools(self) -> list:
"""列出所有工具"""
return [
{
'name': tool.name,
'description': tool.description
}
for tool in self.tools.values()
]
常用工具实现
# 搜索工具
class SearchTool(Tool):
def __init__(self, search_engine):
super().__init__(
name="search",
description="搜索互联网信息",
func=search_engine.search
)
# 计算器工具
class CalculatorTool(Tool):
def __init__(self):
super().__init__(
name="calculator",
description="执行数学计算",
func=self.calculate
)
def calculate(self, expression: str):
"""安全的数学计算"""
# 只允许数学表达式
allowed_chars = set("0123456789+-*/()., ")
if not all(c in allowed_chars for c in expression):
raise ValueError("非法的数学表达式")
try:
result = eval(expression)
return result
except Exception as e:
raise ValueError(f"计算错误: {e}")
# 数据库查询工具
class DatabaseTool(Tool):
def __init__(self, db_connection):
super().__init__(
name="database_query",
description="查询数据库",
func=self.query
)
self.db = db_connection
def query(self, sql: str):
"""执行SQL查询"""
# SQL注入防护
if any(keyword in sql.upper() for keyword in ['DROP', 'DELETE', 'UPDATE']):
raise ValueError("只允许查询操作")
results = self.db.execute(sql)
return results.fetchall()
4. 记忆系统设计
短期记忆
class ShortTermMemory:
"""短期记忆(对话上下文)"""
def __init__(self, max_tokens=4000):
self.messages = []
self.max_tokens = max_tokens
def add(self, role, content):
"""添加消息"""
self.messages.append({
'role': role,
'content': content,
'timestamp': time.time()
})
# 控制token数量
self.trim_if_needed()
def trim_if_needed(self):
"""修剪过长的历史"""
total_tokens = sum(
count_tokens(msg['content'])
for msg in self.messages
)
while total_tokens > self.max_tokens and len(self.messages) > 1:
# 保留系统消息,删除最早的用户消息
if self.messages[1]['role'] != 'system':
self.messages.pop(1)
total_tokens = sum(
count_tokens(msg['content'])
for msg in self.messages
)
def get_messages(self):
"""获取所有消息"""
return self.messages
长期记忆
class LongTermMemory:
"""长期记忆(向量存储)"""
def __init__(self, vector_store, embedding_model):
self.vector_store = vector_store
self.embedding_model = embedding_model
def store(self, text, metadata=None):
"""存储记忆"""
# 生成嵌入
embedding = self.embedding_model.encode(text)
# 存储到向量数据库
doc_id = self.vector_store.add(
vector=embedding,
text=text,
metadata=metadata or {}
)
return doc_id
def retrieve(self, query, top_k=5):
"""检索相关记忆"""
# 查询嵌入
query_embedding = self.embedding_model.encode(query)
# 向量搜索
results = self.vector_store.search(
vector=query_embedding,
top_k=top_k
)
return [
{
'text': r.text,
'metadata': r.metadata,
'score': r.score
}
for r in results
]
混合记忆系统
class HybridMemory:
"""混合记忆系统"""
def __init__(self):
self.short_term = ShortTermMemory()
self.long_term = LongTermMemory(
vector_store=VectorStore(),
embedding_model=EmbeddingModel()
)
self.episodic = EpisodicMemory() # 事件记忆
def remember(self, content, memory_type='short'):
"""存储记忆"""
if memory_type == 'short':
self.short_term.add('assistant', content)
elif memory_type == 'long':
self.long_term.store(content)
elif memory_type == 'episodic':
self.episodic.store_episode(content)
def recall(self, query):
"""回忆"""
# 从短期记忆获取
recent = self.short_term.get_messages()
# 从长期记忆检索
relevant = self.long_term.retrieve(query)
# 从事件记忆获取
episodes = self.episodic.retrieve(query)
return {
'recent': recent,
'relevant': relevant,
'episodes': episodes
}
5. Agent编排层
顺序编排
class SequentialOrchestrator:
"""顺序编排器"""
def __init__(self, agents):
self.agents = agents
def run(self, input_data):
"""顺序执行agents"""
result = input_data
for agent in self.agents:
print(f"执行 Agent: {agent.name}")
result = agent.process(result)
return result
并行编排
import asyncio
class ParallelOrchestrator:
"""并行编排器"""
def __init__(self, agents):
self.agents = agents
async def run(self, input_data):
"""并行执行agents"""
tasks = [
agent.process_async(input_data)
for agent in self.agents
]
results = await asyncio.gather(*tasks)
# 合并结果
merged_result = self.merge_results(results)
return merged_result
条件编排
class ConditionalOrchestrator:
"""条件编排器"""
def __init__(self):
self.routes = {}
def add_route(self, condition, agent):
"""添加路由"""
self.routes[condition] = agent
def run(self, input_data):
"""根据条件选择agent"""
for condition, agent in self.routes.items():
if condition(input_data):
return agent.process(input_data)
raise ValueError("没有匹配的路由")
6. 完整Agent实现
class ProductionAgent:
"""生产级Agent实现"""
def __init__(self, config):
# 初始化组件
self.llm = self._init_llm(config.llm_config)
self.tools = self._init_tools(config.tools_config)
self.memory = HybridMemory()
self.engine = ReActEngine(self.llm)
# 监控和日志
self.logger = Logger()
self.metrics = MetricsCollector()
def run(self, task, user_id=None):
"""执行任务"""
start_time = time.time()
try:
# 记录任务开始
self.logger.info(f"开始任务: {task}")
# 加载用户记忆
if user_id:
context = self.memory.recall(task)
else:
context = {}
# 执行推理
result = self.engine.solve(
question=task,
tools=self.tools,
context=context
)
# 保存到记忆
self.memory.remember(
f"Q: {task}\nA: {result}",
memory_type='long'
)
# 记录指标
latency = time.time() - start_time
self.metrics.record('task_latency', latency)
self.metrics.record('task_success', 1)
return {
'success': True,
'result': result,
'latency': latency
}
except Exception as e:
self.logger.error(f"任务失败: {e}")
self.metrics.record('task_success', 0)
return {
'success': False,
'error': str(e)
}
7. 监控与评估
性能监控
class AgentMonitor:
"""Agent监控"""
def __init__(self):
self.metrics = defaultdict(list)
def track_execution(self, agent_name, task, result, latency):
"""跟踪执行情况"""
self.metrics['executions'].append({
'agent': agent_name,
'task': task,
'success': result.success,
'latency': latency,
'timestamp': time.time()
})
def get_stats(self):
"""获取统计信息"""
executions = self.metrics['executions']
return {
'total': len(executions),
'success_rate': sum(e['success'] for e in executions) / len(executions),
'avg_latency': sum(e['latency'] for e in executions) / len(executions),
'p95_latency': np.percentile([e['latency'] for e in executions], 95)
}
8. 安全性设计
输入验证
class InputValidator:
"""输入验证器"""
def __init__(self):
self.max_length = 10000
self.forbidden_patterns = [
r'<script>',
r'DROP TABLE',
r'exec\(',
]
def validate(self, user_input):
"""验证用户输入"""
# 长度检查
if len(user_input) > self.max_length:
raise ValueError("输入过长")
# 恶意内容检查
for pattern in self.forbidden_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
raise ValueError("检测到潜在恶意内容")
return True
行为约束
class BehaviorConstraints:
"""行为约束"""
def __init__(self):
self.allowed_tools = set()
self.rate_limiter = RateLimiter()
def check_tool_access(self, tool_name, user_id):
"""检查工具访问权限"""
if tool_name not in self.allowed_tools:
raise PermissionError(f"不允许访问工具: {tool_name}")
# 速率限制
if not self.rate_limiter.allow(user_id):
raise RateLimitError("请求过于频繁")
return True
总结
一个完整的 Agent 系统架构需要考虑:
✅ 推理引擎:ReAct、Plan-Execute等模式
✅ 工具系统:灵活的工具注册和调用机制
✅ 记忆管理:短期、长期、事件记忆的结合
✅ 编排层:支持顺序、并行、条件等编排方式
✅ 监控评估:完善的性能和质量监控
✅ 安全防护:输入验证、行为约束、权限控制
通过合理的架构设计,可以构建出高效、可靠、安全的 AI Agent 系统。