跳到主要内容

DocNest系统架构设计

本文详细介绍如何设计和实现一个生产级的 DocNest 文档智能管理系统架构。

系统架构概览

graph TB
A[文档上传] --> B[API网关]
B --> C[文档处理服务]
C --> D[解析引擎]
C --> E[特征提取]
E --> F[分类服务]
E --> G[关联服务]
F --> H[文档索引]
G --> I[知识图谱]
H --> J[检索服务]
I --> J
J --> K[用户界面]

1. 核心服务架构

文档处理服务

from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from typing import List
import asyncio

app = FastAPI()

class DocumentProcessingService:
def __init__(self):
self.parser = DocumentParser()
self.extractor = ContentExtractor()
self.classifier = DocumentClassifier()
self.indexer = DocumentIndexer()
self.queue = asyncio.Queue()

async def process_document(
self,
file: UploadFile,
user_id: str
):
"""处理单个文档"""
try:
# 1. 保存文件
file_path = await self.save_file(file)

# 2. 解析文档
parsed_doc = await self.parser.parse(file_path)

# 3. 提取特征
features = await self.extractor.extract(parsed_doc)

# 4. 分类
categories = await self.classifier.classify(features)

# 5. 索引
doc_id = await self.indexer.index({
'user_id': user_id,
'file_path': file_path,
'content': parsed_doc['content'],
'features': features,
'categories': categories,
'created_at': datetime.now()
})

# 6. 异步构建关系(后台任务)
await self.queue.put({
'task': 'build_relations',
'doc_id': doc_id
})

return {
'doc_id': doc_id,
'status': 'processed',
'categories': categories
}

except Exception as e:
logger.error(f"文档处理失败: {e}")
raise

@app.post("/api/documents/upload")
async def upload_document(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None,
user_id: str = Depends(get_current_user_id)
):
"""上传文档"""
service = DocumentProcessingService()
result = await service.process_document(file, user_id)

# 添加后台任务
background_tasks.add_task(
process_relations,
result['doc_id']
)

return result

文档解析引擎

class UnifiedDocumentParser:
"""统一文档解析引擎"""
def __init__(self):
self.parsers = self.initialize_parsers()
self.ocr_engine = OCREngine()

def initialize_parsers(self):
"""初始化各类解析器"""
return {
'pdf': PDFParser(),
'docx': DocxParser(),
'xlsx': ExcelParser(),
'pptx': PowerPointParser(),
'md': MarkdownParser(),
'html': HTMLParser(),
'txt': TextParser(),
'image': ImageParser(self.ocr_engine)
}

async def parse(self, file_path):
"""智能解析文档"""
# 检测文件类型
file_type = self.detect_file_type(file_path)

# 选择解析器
parser = self.parsers.get(file_type)

if not parser:
raise UnsupportedFileTypeError(
f"不支持的文件类型: {file_type}"
)

# 解析文档
document = await parser.parse(file_path)

# 后处理
processed = self.post_process(document)

return processed

def post_process(self, document):
"""后处理文档"""
# 清理文本
cleaned_text = self.clean_text(document['content'])

# 提取结构
structure = self.extract_structure(document)

# 提取表格
tables = self.extract_tables(document)

# 提取图片
images = self.extract_images(document)

return {
'content': cleaned_text,
'structure': structure,
'tables': tables,
'images': images,
'metadata': document.get('metadata', {})
}

PDF解析优化

class AdvancedPDFParser:
def __init__(self):
self.layout_analyzer = LayoutAnalyzer()
self.table_extractor = TableExtractor()
self.ocr_engine = OCREngine()

async def parse(self, pdf_path):
"""高级PDF解析"""
# 1. 布局分析
layout = await self.layout_analyzer.analyze(pdf_path)

# 2. 文本提取
text_blocks = []
for page in layout.pages:
# 提取文本块
blocks = self.extract_text_blocks(page)

# OCR处理图片区域
for img_region in page.image_regions:
ocr_text = await self.ocr_engine.recognize(
img_region
)
blocks.append({
'type': 'ocr',
'text': ocr_text,
'bbox': img_region.bbox
})

text_blocks.extend(blocks)

# 3. 表格提取
tables = await self.table_extractor.extract(pdf_path)

# 4. 重建文档结构
structured_content = self.reconstruct_structure(
text_blocks, tables
)

return structured_content

2. 特征提取架构

多层次特征提取

class MultiLevelFeatureExtractor:
def __init__(self, llm, embedding_model):
self.llm = llm
self.embedding_model = embedding_model
self.ner_model = NERModel()
self.keyword_extractor = KeywordExtractor()

async def extract(self, document):
"""多层次特征提取"""
features = {}

# 1. 文本层特征
features['text'] = await self.extract_text_features(
document
)

# 2. 语义层特征
features['semantic'] = await self.extract_semantic_features(
document
)

# 3. 结构层特征
features['structure'] = self.extract_structure_features(
document
)

# 4. 实体层特征
features['entities'] = await self.extract_entity_features(
document
)

return features

async def extract_semantic_features(self, document):
"""提取语义特征"""
# 文档嵌入
doc_embedding = await self.embedding_model.encode(
document['content']
)

# 段落嵌入
paragraph_embeddings = []
for para in document.get('paragraphs', []):
para_emb = await self.embedding_model.encode(para)
paragraph_embeddings.append(para_emb)

# 主题提取
topics = await self.extract_topics(document)

return {
'document_embedding': doc_embedding,
'paragraph_embeddings': paragraph_embeddings,
'topics': topics
}

实体识别与链接

class EntityRecognitionAndLinking:
def __init__(self):
self.ner = NERModel()
self.entity_linker = EntityLinker()
self.knowledge_base = KnowledgeBase()

async def extract_entities(self, text):
"""提取并链接实体"""
# 1. 命名实体识别
entities = self.ner.extract(text)

# 2. 实体消歧和链接
linked_entities = []
for entity in entities:
# 查找候选实体
candidates = await self.knowledge_base.search(
entity.text
)

# 链接到知识库
if candidates:
linked = await self.entity_linker.link(
entity, candidates, context=text
)
linked_entities.append(linked)

# 3. 实体关系提取
relations = self.extract_entity_relations(
linked_entities, text
)

return {
'entities': linked_entities,
'relations': relations
}

3. 分类与标注服务

多标签分类器

class MultiLabelClassifier:
def __init__(self, llm, taxonomy):
self.llm = llm
self.taxonomy = taxonomy
self.embedding_model = EmbeddingModel()

async def classify(self, document_features):
"""多标签分类"""
# 1. 基于规则的初步分类
rule_based_cats = self.rule_based_classify(
document_features
)

# 2. 基于LLM的精细分类
llm_cats = await self.llm_based_classify(
document_features
)

# 3. 基于向量相似度的分类
vector_cats = await self.vector_based_classify(
document_features
)

# 4. 融合结果
final_categories = self.merge_classifications(
rule_based_cats,
llm_cats,
vector_cats
)

return final_categories

async def llm_based_classify(self, features):
"""基于LLM的分类"""
prompt = f"""
基于以下文档特征,从分类体系中选择最合适的3-5个类别:

标题:{features['title']}
摘要:{features['summary']}
关键词:{', '.join(features['keywords'])}
实体:{', '.join([e['text'] for e in features['entities']])}

分类体系:
{self.taxonomy.to_string()}

要求:
1. 可以选择多个类别
2. 包括主类别和子类别
3. 每个类别给出置信度(0-1)

返回JSON格式:
[
{{"category": "类别路径", "confidence": 0.9}},
...
]
"""

response = await self.llm.generate(prompt)
categories = json.loads(response)

return categories

自动标注系统

class AutoTaggingSystem:
def __init__(self):
self.tag_extractor = TagExtractor()
self.tag_recommender = TagRecommender()
self.tag_graph = TagGraph()

async def auto_tag(self, document, existing_tags=None):
"""自动标注"""
# 1. 提取候选标签
candidate_tags = await self.extract_candidate_tags(
document
)

# 2. 基于历史推荐标签
if existing_tags:
recommended = await self.tag_recommender.recommend(
document, existing_tags
)
candidate_tags.extend(recommended)

# 3. 标签去重和排序
unique_tags = self.deduplicate_tags(candidate_tags)
ranked_tags = self.rank_tags(unique_tags, document)

# 4. 标签层次化
hierarchical_tags = self.build_tag_hierarchy(
ranked_tags
)

return hierarchical_tags

def build_tag_hierarchy(self, tags):
"""构建标签层次"""
hierarchy = {}

for tag in tags:
# 查找父标签
parent_tags = self.tag_graph.get_parents(tag)

# 构建层次结构
current_level = hierarchy
for parent in parent_tags:
if parent not in current_level:
current_level[parent] = {}
current_level = current_level[parent]

current_level[tag['name']] = {
'confidence': tag['confidence'],
'source': tag['source']
}

return hierarchy

4. 关系图谱架构

图数据库设计

from neo4j import GraphDatabase

class DocumentGraphDB:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))

def create_document_node(self, doc_id, properties):
"""创建文档节点"""
with self.driver.session() as session:
session.run("""
CREATE (d:Document {id: $doc_id})
SET d += $properties
""", doc_id=doc_id, properties=properties)

def create_relation(
self,
source_id,
target_id,
relation_type,
properties
):
"""创建关系"""
with self.driver.session() as session:
session.run(f"""
MATCH (s:Document {{id: $source_id}})
MATCH (t:Document {{id: $target_id}})
CREATE (s)-[r:{relation_type}]->(t)
SET r += $properties
""",
source_id=source_id,
target_id=target_id,
properties=properties
)

def find_related_documents(
self,
doc_id,
relation_types=None,
max_depth=2
):
"""查找相关文档"""
relation_filter = ""
if relation_types:
relation_filter = f":{"|".join(relation_types)}"

with self.driver.session() as session:
result = session.run(f"""
MATCH path = (d:Document {{id: $doc_id}})
-[r{relation_filter}*1..{max_depth}]-(related:Document)
RETURN related, r, length(path) as depth
ORDER BY depth, r.score DESC
LIMIT 50
""", doc_id=doc_id)

return [
{
'document': dict(record['related']),
'relations': [dict(r) for r in record['r']],
'depth': record['depth']
}
for record in result
]

关系发现算法

class RelationDiscovery:
def __init__(self):
self.similarity_calculator = SimilarityCalculator()
self.citation_analyzer = CitationAnalyzer()
self.co_occurrence_analyzer = CoOccurrenceAnalyzer()

async def discover_relations(self, document, corpus):
"""发现文档关系"""
relations = []

# 1. 内容相似关系
similar_docs = await self.find_similar_documents(
document, corpus
)
relations.extend(similar_docs)

# 2. 引用关系
citations = self.citation_analyzer.analyze(
document, corpus
)
relations.extend(citations)

# 3. 共现关系
co_occurrences = self.co_occurrence_analyzer.analyze(
document, corpus
)
relations.extend(co_occurrences)

# 4. 时序关系
temporal_relations = self.find_temporal_relations(
document, corpus
)
relations.extend(temporal_relations)

return self.filter_and_rank_relations(relations)

def filter_and_rank_relations(self, relations):
"""过滤和排序关系"""
# 去重
unique_relations = self.deduplicate_relations(relations)

# 过滤低置信度关系
filtered = [
r for r in unique_relations
if r['confidence'] >= 0.5
]

# 排序
sorted_relations = sorted(
filtered,
key=lambda x: x['confidence'],
reverse=True
)

return sorted_relations[:50] # 返回Top 50

5. 检索服务架构

混合检索引擎

class HybridSearchEngine:
def __init__(self):
self.vector_search = VectorSearchEngine()
self.keyword_search = KeywordSearchEngine()
self.graph_search = GraphSearchEngine()
self.reranker = Reranker()

async def search(self, query, options=None):
"""混合检索"""
# 1. 并行执行多种检索
vector_results, keyword_results, graph_results = \
await asyncio.gather(
self.vector_search.search(query),
self.keyword_search.search(query),
self.graph_search.search(query)
)

# 2. 结果融合
merged_results = self.merge_results(
vector_results,
keyword_results,
graph_results
)

# 3. 重排序
reranked = await self.reranker.rerank(
query, merged_results
)

# 4. 应用过滤和分面
if options:
reranked = self.apply_filters(reranked, options)

return reranked

def merge_results(self, *result_sets):
"""融合检索结果"""
# 使用倒数排名融合(RRF)
doc_scores = {}

for rank_const, results in enumerate(result_sets, 1):
for rank, doc in enumerate(results, 1):
doc_id = doc['id']
score = 1.0 / (rank + 60) # RRF公式

if doc_id not in doc_scores:
doc_scores[doc_id] = {
'doc': doc,
'score': 0
}

doc_scores[doc_id]['score'] += score

# 排序
sorted_docs = sorted(
doc_scores.values(),
key=lambda x: x['score'],
reverse=True
)

return [item['doc'] for item in sorted_docs]

语义缓存

class SemanticCache:
def __init__(self, vector_store, threshold=0.95):
self.vector_store = vector_store
self.threshold = threshold
self.embedding_model = EmbeddingModel()

async def get(self, query):
"""获取缓存结果"""
# 生成查询嵌入
query_embedding = await self.embedding_model.encode(query)

# 查找相似查询
similar_queries = await self.vector_store.search(
vector=query_embedding,
top_k=1,
threshold=self.threshold
)

if similar_queries:
# 返回缓存的结果
return similar_queries[0].metadata['results']

return None

async def set(self, query, results):
"""设置缓存"""
query_embedding = await self.embedding_model.encode(query)

await self.vector_store.add(
vector=query_embedding,
metadata={
'query': query,
'results': results,
'timestamp': time.time()
}
)

6. 可视化服务

知识图谱可视化

class GraphVisualization:
def __init__(self, graph_db):
self.graph_db = graph_db

def generate_visualization(
self,
doc_id,
max_depth=2,
max_nodes=100
):
"""生成图谱可视化数据"""
# 1. 获取子图
subgraph = self.graph_db.get_subgraph(
doc_id, max_depth, max_nodes
)

# 2. 转换为可视化格式
vis_data = {
'nodes': [],
'edges': []
}

# 处理节点
for node in subgraph.nodes:
vis_data['nodes'].append({
'id': node.id,
'label': node.properties.get('title', ''),
'category': node.properties.get('category', ''),
'size': self.calculate_node_size(node),
'color': self.get_category_color(
node.properties.get('category')
)
})

# 处理边
for edge in subgraph.edges:
vis_data['edges'].append({
'source': edge.source_id,
'target': edge.target_id,
'label': edge.type,
'weight': edge.properties.get('score', 0.5)
})

return vis_data

7. 性能优化

异步处理架构

import asyncio
from celery import Celery

celery_app = Celery('docnest', broker='redis://localhost:6379')

@celery_app.task
def process_document_async(doc_id):
"""异步处理文档"""
# 解析文档
parsed = parse_document(doc_id)

# 提取特征
features = extract_features(parsed)

# 构建关系
build_relations(doc_id, features)

# 更新索引
update_index(doc_id, features)

class AsyncDocumentProcessor:
def __init__(self):
self.task_queue = asyncio.Queue()
self.workers = []

async def start_workers(self, num_workers=4):
"""启动工作进程"""
self.workers = [
asyncio.create_task(self.worker())
for _ in range(num_workers)
]

async def worker(self):
"""工作进程"""
while True:
task = await self.task_queue.get()

try:
await self.process_task(task)
except Exception as e:
logger.error(f"任务处理失败: {e}")
finally:
self.task_queue.task_done()

分布式索引

class DistributedIndexing:
def __init__(self, es_cluster):
self.es = es_cluster
self.shard_count = 5
self.replica_count = 2

def create_index(self, index_name):
"""创建分布式索引"""
self.es.indices.create(
index=index_name,
body={
'settings': {
'number_of_shards': self.shard_count,
'number_of_replicas': self.replica_count,
'refresh_interval': '30s'
},
'mappings': {
'properties': {
'content': {'type': 'text'},
'embedding': {
'type': 'dense_vector',
'dims': 768
},
'category': {'type': 'keyword'},
'created_at': {'type': 'date'}
}
}
}
)

总结

一个生产级的 DocNest 系统需要:

微服务架构:模块解耦,独立扩展
异步处理:提高吞吐量
分布式存储:支持海量文档
智能算法:准确的分类和关联
高性能检索:混合检索策略
可视化:直观的知识图谱展示

通过合理的架构设计,DocNest 可以高效管理和组织大规模文档集合。