🤖 MAC-SQL: 多代理协作的Text-to-SQL框架
📚 内容概览
MAC-SQL (Multi-Agent Collaboration for Text-to-SQL) 是一种创新的Text-to-SQL框架,通过多个专业化智能体的协作,将自然语言查询准确转换为SQL语句。该框架充分利用大语言模型(LLM)的能力,通过分工协作、迭代优化的方式,显著提升了SQL生成的准确性和鲁棒性。
🎯 一、为什么需要MAC-SQL?
1.1 传统Text-to-SQL的挑战
- 🔍 Schema理解困难:数据库表结构复杂,难以准确映射字段
- 🧩 语义歧义:自然语言表达方式多样,存在歧义
- 🔗 跨表JOIN复杂:多表关联逻辑难以推断
- 🐛 错误难以修正:生成的SQL可能存在语法或逻辑错误
- 📊 复杂查询挑战:嵌套查询、聚合函数、窗口函数等高级特性
1.2 MAC-SQL的创新思路
采用分治策略,将复杂的Text-to-SQL任务分解为多个子任务,由专业化代理协作完成:
自然语言查询 → Decomposer → Selector → Refiner → Checker → Integrator → 最终SQL
🏗️ 二、系统架构
2.1 核心代理
| 代理 | 职责 | 输入 | 输出 |
|---|---|---|---|
| Decomposer | 任务分解 | 自然语言查询 | 结构化查询意图 |
| Selector | Schema选择 | 查询意图 + DB Schema | 相关表和字段列表 |
| Refiner | SQL生成 | 查询意图 + 选中Schema | 候选SQL语句 |
| Checker | 错误检查 | 候选SQL + Schema | 错误报告/修正建议 |
| Integrator | 结果集成 | 所有代理输出 | 最终SQL语句 |
2.2 工作流程
class MACSQLWorkflow:
def process(self, question: str, db_schema: Dict):
# 1. 任务分解
intent = self.decomposer.decompose(question)
# 2. Schema选择
relevant_schema = self.selector.select(intent, db_schema)
# 3. SQL生成(多候选)
sql_candidates = self.refiner.generate(intent, relevant_schema, num_candidates=3)
# 4. 验证与修正
validated_sqls = []
for sql in sql_candidates:
errors = self.checker.validate(sql, relevant_schema)
if errors:
sql = self.refiner.fix(sql, errors)
validated_sqls.append(sql)
# 5. 集成最佳结果
final_sql = self.integrator.select_best(validated_sqls)
return final_sql
🔧 三、核心组件实现
3.1 Decomposer - 任务分解器
class Decomposer:
def decompose(self, question: str) -> QueryIntent:
prompt = f"""
分析以下查询,提取结构化信息:
查询:{question}
输出JSON:
{{
"query_type": "SELECT/UPDATE/INSERT/DELETE",
"target_entities": ["实体1", "实体2"],
"conditions": ["条件1"],
"aggregations": ["聚合函数"],
"sorting": {{"field": "字段", "order": "ASC/DESC"}},
"limit": 数量
}}
"""
response = self.llm.generate(prompt)
return QueryIntent.from_json(response)
3.2 Selector - Schema选择器
class SchemaSelector:
def select(self, intent: QueryIntent, db_schema: DBSchema) -> SelectedSchema:
# 1. 实体识别 - 匹配表名
relevant_tables = self._match_tables(intent.target_entities, db_schema.tables)
# 2. 字段选择
relevant_columns = self._match_columns(intent, relevant_tables)
# 3. 关系推断
join_paths = self._infer_joins(relevant_tables, db_schema)
return SelectedSchema(tables=relevant_tables, columns=relevant_columns, joins=join_paths)
def _match_tables(self, entities: List[str], tables: List[Table]) -> List[Table]:
"""基于语义相似度匹配表"""
entity_embeddings = self.embedder.embed(entities)
table_embeddings = self.embedder.embed([t.name for t in tables])
similarities = cosine_similarity(entity_embeddings, table_embeddings)
matched_indices = np.where(similarities > 0.7)[1]
return [tables[i] for i in matched_indices]
3.3 Refiner - SQL精炼器
class SQLRefiner:
def generate(self, intent: QueryIntent, schema: SelectedSchema, num_candidates: int = 3):
prompt = f"""
基于以下信息生成SQL:
Schema:{schema.tables}, {schema.columns}
需求:{intent}
要求:
1. 使用标准SQL语法
2. 添加适当别名
3. 处理NULL值
4. 优化查询性能
"""
candidates = []
for i in range(num_candidates):
sql = self.llm.generate(prompt, temperature=0.3 + i * 0.2)
candidates.append(sql)
return candidates
def fix(self, sql: str, errors: List[str]) -> str:
prompt = f"修正以下SQL错误:\nSQL: {sql}\n错误: {errors}"
return self.llm.generate(prompt)
3.4 Checker - 验证器
class SQLChecker:
def validate(self, sql: str, schema: SelectedSchema) -> List[str]:
errors = []
# 1. 语法检查
try:
sqlparse.parse(sql)
except Exception as e:
errors.append(f"语法错误: {e}")
# 2. Schema一致性
errors.extend(self._check_schema_consistency(sql, schema))
# 3. 执行测试
try:
self.db.execute(f"EXPLAIN {sql}")
except Exception as e:
errors.append(f"执行错误: {e}")
return errors
3.5 Integrator - 集成器
class SQLIntegrator:
def select_best(self, sql_candidates: List[str]) -> str:
scores = [self._evaluate(sql) for sql in sql_candidates]
best_idx = np.argmax(scores)
return sql_candidates[best_idx]
def _evaluate(self, sql: str) -> float:
score = 0.0
# 1. 执行计划成本
try:
plan = self.db.execute(f"EXPLAIN {sql}").fetchone()
cost = self._parse_cost(plan)
score += 1.0 / (1.0 + cost)
except:
return 0.0
# 2. 查询复杂度
complexity = len(sqlparse.parse(sql)[0].tokens)
score += 0.5 if 3 <= complexity <= 10 else 0.2
return score
🚀 四、完整实现示例
class MACSQLSystem:
def __init__(self, config: Config):
self.llm = OpenAI(config.llm_model)
self.embedder = OpenAIEmbeddings()
self.db = Database(config.db_uri)
self.decomposer = Decomposer(self.llm)
self.selector = SchemaSelector(self.llm, self.embedder)
self.refiner = SQLRefiner(self.llm)
self.checker = SQLChecker(self.db)
self.integrator = SQLIntegrator(self.llm, self.db)
def query(self, question: str) -> SQLResult:
db_schema = self.db.get_schema()
workflow = MACSQLWorkflow(
self.decomposer, self.selector, self.refiner,
self.checker, self.integrator
)
sql = workflow.process(question, db_schema)
result = self.db.execute(sql)
return SQLResult(sql=sql, data=result.fetchall(), columns=result.keys())
# 使用示例
config = Config(llm_model="gpt-4", db_uri="postgresql://localhost/mydb")
mac_sql = MACSQLSystem(config)
result = mac_sql.query("统计每个部门的员工数量")
print(f"SQL: {result.sql}")
print(f"结果: {result.data}")
📊 五、性能优化
5.1 缓存策略
class MACSQLCache:
def __init__(self, redis_client):
self.cache = redis_client
def get_sql(self, question: str, schema_hash: str) -> Optional[str]:
key = f"sql:{schema_hash}:{hashlib.md5(question.encode()).hexdigest()}"
return self.cache.get(key)
def cache_sql(self, question: str, schema_hash: str, sql: str, ttl=3600):
key = f"sql:{schema_hash}:{hashlib.md5(question.encode()).hexdigest()}"
self.cache.setex(key, ttl, sql)
5.2 并行化处理
async def generate_parallel(intent, schema, num_candidates=3):
with ThreadPoolExecutor(max_workers=num_candidates) as executor:
tasks = [executor.submit(refiner.generate, intent, schema) for _ in range(num_candidates)]
return [task.result() for task in tasks]
🎯 六、应用场景
6.1 商业智能(BI)
class BIDashboard:
def get_kpi(self, metric_name: str, filters: Dict):
question = f"计算{metric_name},条件:{filters}"
result = self.mac_sql.query(question)
return result.data[0][0]
6.2 数据分析助手
class DataAnalysisAssistant:
def analyze(self, question: str):
sql_result = self.mac_sql.query(question)
df = pd.DataFrame(sql_result.data, columns=sql_result.columns)
return {'sql': sql_result.sql, 'data': df}
🛠️ 七、工具与框架
| 组件 | 推荐选择 | 说明 |
|---|---|---|
| LLM | GPT-4, Claude | 核心推理引擎 |
| Embedding | OpenAI Ada-002 | Schema匹配 |
| 数据库 | PostgreSQL, MySQL | 支持EXPLAIN |
| SQL解析 | sqlparse, sqlglot | 语法分析 |
| 缓存 | Redis | 结果缓存 |
📈 八、评估指标
| 指标 | 定义 | 计算方式 |
|---|---|---|
| Execution Accuracy (EX) | 执行准确率 | 生成SQL返回正确结果的比例 |
| Exact Match (EM) | 精确匹配 | 生成SQL与标准答案完全一致 |
| Component Match | 组件匹配 | SELECT/WHERE/JOIN等子句正确率 |
🎓 九、最佳实践
9.1 提示工程
✅ 推荐做法
- 提供清晰的Schema描述和示例
- 使用Few-Shot示例提升准确性
- 明确指定SQL方言
- 添加安全约束(禁止DROP/DELETE)
❌ 避免陷阱
- Schema信息过载
- 缺少类型信息
- 忽略NULL值处理
- 未考虑查询性能
9.2 安全性
def validate_sql_security(sql: str) -> bool:
dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER']
sql_upper = sql.upper()
for keyword in dangerous_keywords:
if keyword in sql_upper:
raise SecurityError(f"检测到危险操作: {keyword}")
return True
❓ 十、常见问题解答
Q1: MAC-SQL与Seq2Seq模型相比有什么优势?
A: 主要优势:
- ✅ 可解释性强,决策过程可追踪
- ✅ 模块化设计,易于调试优化
- ✅ 错误修正能力,迭代提升准确率
- ✅ 适应性强,易扩展到新Schema
- ✅ Few-Shot学习,无需大量训练数据
Q2: 如何处理复杂嵌套查询?
A: 采用递归分解策略:
- Decomposer识别嵌套结构
- Refiner先生成内层子查询
- 基于子查询构建外层查询
- Checker验证各层逻辑
Q3: 支持哪些数据库?
A: 支持所有主流关系型数据库:
- ✅ PostgreSQL(推荐)
- ✅ MySQL/MariaDB
- ✅ SQLite
- ✅ SQL Server
- ✅ Oracle
Q4: 如何优化大规模数据库性能?
A: 关键策略:
- 🔍 Schema索引加速Selector
- 💾 结果缓存高频查询
- ⚡ 并行生成候选SQL
- 📊 利用数据库统计信息
- 🎯 Schema剪枝减少计算
📚 参考资源
🔗 延伸阅读
🎯 总结
MAC-SQL通过多代理协作机制,将复杂的Text-to-SQL任务分解为可管理的子任务,显著提升了SQL生成的准确性和可靠性。
核心要点:
- ✅ 多代理协作实现专业化分工
- ✅ 迭代优化提升准确率
- ✅ 模块化设计易于扩展维护
- ✅ 适用于复杂查询和大规模数据库
- ✅ 可与现有BI系统无缝集成
开始使用MAC-SQL,让数据查询更智能!🚀
关键词: MAC-SQL, Text-to-SQL, NL2SQL, 多代理协作, Multi-Agent, LLM, 数据库查询, SQL生成, Schema Linking, 错误修正