跳到主要内容

Agent系统架构设计

本文详细介绍如何设计和实现一个生产级的 AI Agent 系统架构。

架构概览

完整的 Agent 系统架构包含多个层次:

graph TB
A[用户交互层] --> B[Agent编排层]
B --> C[推理引擎]
B --> D[工具调用层]
B --> E[记忆管理]
C --> F[LLM服务]
D --> G[工具库]
E --> H[向量存储]
E --> I[关系数据库]
B --> J[监控层]

1. 核心架构模式

单Agent架构

最基础的 Agent 架构:

class SingleAgent:
def __init__(self, llm, tools, memory):
self.llm = llm
self.tools = tools
self.memory = memory
self.prompt_template = PromptTemplate()

def run(self, task):
"""执行任务"""
# 初始化上下文
context = self.memory.load_context(task)

# 迭代执行
max_iterations = 10
for i in range(max_iterations):
# 生成思考和行动
thought_action = self.think_and_act(context)

# 检查是否完成
if thought_action.is_final:
return thought_action.result

# 执行工具
observation = self.execute_tool(thought_action.action)

# 更新上下文
context.append({
'thought': thought_action.thought,
'action': thought_action.action,
'observation': observation
})

# 保存到记忆
self.memory.save(context)

return "任务未完成,达到最大迭代次数"

def think_and_act(self, context):
"""思考并决定行动"""
prompt = self.prompt_template.format(context)
response = self.llm.generate(prompt)
return self.parse_response(response)

多Agent架构

多个 Agent 协作完成复杂任务:

class MultiAgentSystem:
def __init__(self):
self.agents = {}
self.coordinator = Coordinator()
self.message_bus = MessageBus()

def register_agent(self, name, agent):
"""注册Agent"""
self.agents[name] = agent
agent.set_message_bus(self.message_bus)

def solve(self, task):
"""多Agent协作解决任务"""
# 任务分解
subtasks = self.coordinator.decompose(task)

# 分配任务
assignments = self.coordinator.assign(subtasks, self.agents)

# 并行执行
results = {}
for agent_name, subtask in assignments.items():
agent = self.agents[agent_name]
result = agent.execute(subtask)
results[agent_name] = result

# 结果聚合
final_result = self.coordinator.aggregate(results)
return final_result

2. 推理引擎设计

ReAct推理引擎

结合推理和行动的引擎:

class ReActEngine:
def __init__(self, llm, max_steps=10):
self.llm = llm
self.max_steps = max_steps
self.prompt = """
你是一个解决问题的AI助手。你需要通过思考和行动来解决问题。

可用工具:
{tools}

使用以下格式:
Thought: 你应该思考要做什么
Action: 要执行的动作
Action Input: 动作的输入
Observation: 动作的结果
... (重复 Thought/Action/Observation)
Thought: 我现在知道最终答案了
Final Answer: 最终答案

开始!

Question: {question}
{scratchpad}
"""

def solve(self, question, tools):
"""使用ReAct模式解决问题"""
scratchpad = ""

for step in range(self.max_steps):
# 生成思考和行动
prompt = self.prompt.format(
tools=self.format_tools(tools),
question=question,
scratchpad=scratchpad
)

response = self.llm.generate(prompt)

# 解析响应
if "Final Answer:" in response:
answer = self.extract_final_answer(response)
return answer

thought, action, action_input = self.parse_step(response)

# 执行动作
observation = self.execute_action(
action, action_input, tools
)

# 更新scratchpad
scratchpad += f"""
Thought: {thought}
Action: {action}
Action Input: {action_input}
Observation: {observation}
"""

return "无法在规定步数内完成任务"

Plan-and-Execute引擎

先规划再执行的引擎:

class PlanExecuteEngine:
def __init__(self, planner_llm, executor_llm):
self.planner = Planner(planner_llm)
self.executor = Executor(executor_llm)

def solve(self, objective, tools):
"""规划并执行"""
# 第一阶段:制定计划
plan = self.planner.create_plan(objective, tools)

print(f"计划: {plan}")

# 第二阶段:执行计划
results = []
for step in plan.steps:
print(f"\n执行步骤: {step.description}")

# 执行当前步骤
result = self.executor.execute_step(
step, tools, results
)
results.append(result)

# 检查是否需要重新规划
if not result.success:
print("步骤失败,重新规划...")
remaining_objective = self.formulate_remaining(
objective, results
)
plan = self.planner.create_plan(
remaining_objective, tools
)

# 综合结果
final_answer = self.synthesize_results(results)
return final_answer

3. 工具系统设计

工具定义

from pydantic import BaseModel, Field

class ToolDefinition(BaseModel):
"""工具定义"""
name: str = Field(description="工具名称")
description: str = Field(description="工具描述")
parameters: dict = Field(description="参数schema")

def to_prompt(self):
"""转换为prompt格式"""
return f"{self.name}: {self.description}\n参数: {self.parameters}"

class Tool:
"""工具基类"""
def __init__(self, name, description, func):
self.name = name
self.description = description
self.func = func

def run(self, *args, **kwargs):
"""执行工具"""
try:
result = self.func(*args, **kwargs)
return {
'success': True,
'result': result
}
except Exception as e:
return {
'success': False,
'error': str(e)
}

工具注册表

class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools = {}

def register(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool

def get(self, name: str) -> Tool:
"""获取工具"""
if name not in self.tools:
raise ValueError(f"工具 {name} 不存在")
return self.tools[name]

def list_tools(self) -> list:
"""列出所有工具"""
return [
{
'name': tool.name,
'description': tool.description
}
for tool in self.tools.values()
]

常用工具实现

# 搜索工具
class SearchTool(Tool):
def __init__(self, search_engine):
super().__init__(
name="search",
description="搜索互联网信息",
func=search_engine.search
)

# 计算器工具
class CalculatorTool(Tool):
def __init__(self):
super().__init__(
name="calculator",
description="执行数学计算",
func=self.calculate
)

def calculate(self, expression: str):
"""安全的数学计算"""
# 只允许数学表达式
allowed_chars = set("0123456789+-*/()., ")
if not all(c in allowed_chars for c in expression):
raise ValueError("非法的数学表达式")

try:
result = eval(expression)
return result
except Exception as e:
raise ValueError(f"计算错误: {e}")

# 数据库查询工具
class DatabaseTool(Tool):
def __init__(self, db_connection):
super().__init__(
name="database_query",
description="查询数据库",
func=self.query
)
self.db = db_connection

def query(self, sql: str):
"""执行SQL查询"""
# SQL注入防护
if any(keyword in sql.upper() for keyword in ['DROP', 'DELETE', 'UPDATE']):
raise ValueError("只允许查询操作")

results = self.db.execute(sql)
return results.fetchall()

4. 记忆系统设计

短期记忆

class ShortTermMemory:
"""短期记忆(对话上下文)"""
def __init__(self, max_tokens=4000):
self.messages = []
self.max_tokens = max_tokens

def add(self, role, content):
"""添加消息"""
self.messages.append({
'role': role,
'content': content,
'timestamp': time.time()
})

# 控制token数量
self.trim_if_needed()

def trim_if_needed(self):
"""修剪过长的历史"""
total_tokens = sum(
count_tokens(msg['content'])
for msg in self.messages
)

while total_tokens > self.max_tokens and len(self.messages) > 1:
# 保留系统消息,删除最早的用户消息
if self.messages[1]['role'] != 'system':
self.messages.pop(1)
total_tokens = sum(
count_tokens(msg['content'])
for msg in self.messages
)

def get_messages(self):
"""获取所有消息"""
return self.messages

长期记忆

class LongTermMemory:
"""长期记忆(向量存储)"""
def __init__(self, vector_store, embedding_model):
self.vector_store = vector_store
self.embedding_model = embedding_model

def store(self, text, metadata=None):
"""存储记忆"""
# 生成嵌入
embedding = self.embedding_model.encode(text)

# 存储到向量数据库
doc_id = self.vector_store.add(
vector=embedding,
text=text,
metadata=metadata or {}
)

return doc_id

def retrieve(self, query, top_k=5):
"""检索相关记忆"""
# 查询嵌入
query_embedding = self.embedding_model.encode(query)

# 向量搜索
results = self.vector_store.search(
vector=query_embedding,
top_k=top_k
)

return [
{
'text': r.text,
'metadata': r.metadata,
'score': r.score
}
for r in results
]

混合记忆系统

class HybridMemory:
"""混合记忆系统"""
def __init__(self):
self.short_term = ShortTermMemory()
self.long_term = LongTermMemory(
vector_store=VectorStore(),
embedding_model=EmbeddingModel()
)
self.episodic = EpisodicMemory() # 事件记忆

def remember(self, content, memory_type='short'):
"""存储记忆"""
if memory_type == 'short':
self.short_term.add('assistant', content)
elif memory_type == 'long':
self.long_term.store(content)
elif memory_type == 'episodic':
self.episodic.store_episode(content)

def recall(self, query):
"""回忆"""
# 从短期记忆获取
recent = self.short_term.get_messages()

# 从长期记忆检索
relevant = self.long_term.retrieve(query)

# 从事件记忆获取
episodes = self.episodic.retrieve(query)

return {
'recent': recent,
'relevant': relevant,
'episodes': episodes
}

5. Agent编排层

顺序编排

class SequentialOrchestrator:
"""顺序编排器"""
def __init__(self, agents):
self.agents = agents

def run(self, input_data):
"""顺序执行agents"""
result = input_data

for agent in self.agents:
print(f"执行 Agent: {agent.name}")
result = agent.process(result)

return result

并行编排

import asyncio

class ParallelOrchestrator:
"""并行编排器"""
def __init__(self, agents):
self.agents = agents

async def run(self, input_data):
"""并行执行agents"""
tasks = [
agent.process_async(input_data)
for agent in self.agents
]

results = await asyncio.gather(*tasks)

# 合并结果
merged_result = self.merge_results(results)
return merged_result

条件编排

class ConditionalOrchestrator:
"""条件编排器"""
def __init__(self):
self.routes = {}

def add_route(self, condition, agent):
"""添加路由"""
self.routes[condition] = agent

def run(self, input_data):
"""根据条件选择agent"""
for condition, agent in self.routes.items():
if condition(input_data):
return agent.process(input_data)

raise ValueError("没有匹配的路由")

6. 完整Agent实现

class ProductionAgent:
"""生产级Agent实现"""
def __init__(self, config):
# 初始化组件
self.llm = self._init_llm(config.llm_config)
self.tools = self._init_tools(config.tools_config)
self.memory = HybridMemory()
self.engine = ReActEngine(self.llm)

# 监控和日志
self.logger = Logger()
self.metrics = MetricsCollector()

def run(self, task, user_id=None):
"""执行任务"""
start_time = time.time()

try:
# 记录任务开始
self.logger.info(f"开始任务: {task}")

# 加载用户记忆
if user_id:
context = self.memory.recall(task)
else:
context = {}

# 执行推理
result = self.engine.solve(
question=task,
tools=self.tools,
context=context
)

# 保存到记忆
self.memory.remember(
f"Q: {task}\nA: {result}",
memory_type='long'
)

# 记录指标
latency = time.time() - start_time
self.metrics.record('task_latency', latency)
self.metrics.record('task_success', 1)

return {
'success': True,
'result': result,
'latency': latency
}

except Exception as e:
self.logger.error(f"任务失败: {e}")
self.metrics.record('task_success', 0)

return {
'success': False,
'error': str(e)
}

7. 监控与评估

性能监控

class AgentMonitor:
"""Agent监控"""
def __init__(self):
self.metrics = defaultdict(list)

def track_execution(self, agent_name, task, result, latency):
"""跟踪执行情况"""
self.metrics['executions'].append({
'agent': agent_name,
'task': task,
'success': result.success,
'latency': latency,
'timestamp': time.time()
})

def get_stats(self):
"""获取统计信息"""
executions = self.metrics['executions']

return {
'total': len(executions),
'success_rate': sum(e['success'] for e in executions) / len(executions),
'avg_latency': sum(e['latency'] for e in executions) / len(executions),
'p95_latency': np.percentile([e['latency'] for e in executions], 95)
}

8. 安全性设计

输入验证

class InputValidator:
"""输入验证器"""
def __init__(self):
self.max_length = 10000
self.forbidden_patterns = [
r'<script>',
r'DROP TABLE',
r'exec\(',
]

def validate(self, user_input):
"""验证用户输入"""
# 长度检查
if len(user_input) > self.max_length:
raise ValueError("输入过长")

# 恶意内容检查
for pattern in self.forbidden_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
raise ValueError("检测到潜在恶意内容")

return True

行为约束

class BehaviorConstraints:
"""行为约束"""
def __init__(self):
self.allowed_tools = set()
self.rate_limiter = RateLimiter()

def check_tool_access(self, tool_name, user_id):
"""检查工具访问权限"""
if tool_name not in self.allowed_tools:
raise PermissionError(f"不允许访问工具: {tool_name}")

# 速率限制
if not self.rate_limiter.allow(user_id):
raise RateLimitError("请求过于频繁")

return True

总结

一个完整的 Agent 系统架构需要考虑:

推理引擎:ReAct、Plan-Execute等模式
工具系统:灵活的工具注册和调用机制
记忆管理:短期、长期、事件记忆的结合
编排层:支持顺序、并行、条件等编排方式
监控评估:完善的性能和质量监控
安全防护:输入验证、行为约束、权限控制

通过合理的架构设计,可以构建出高效、可靠、安全的 AI Agent 系统。