跳到主要内容

AI云原生系统架构设计

本文详细介绍如何设计和实现一个生产级的AI云原生平台架构。

系统架构概览

graph TB
A[用户/应用] --> B[Ingress/API Gateway]
B --> C[认证授权]
C --> D[服务网格 Istio]
D --> E[模型服务层]
D --> F[训练任务层]
D --> G[数据处理层]
E --> H[模型注册中心]
F --> I[GPU资源池]
G --> J[存储层]
K[监控告警] --> E
K --> F
K --> G

1. 平台分层架构

基础设施层

# GPU节点池配置
apiVersion: v1
kind: Node
metadata:
name: gpu-node-1
labels:
node-type: gpu-worker
gpu-type: nvidia-a100
gpu-count: "8"
spec:
taints:
- key: nvidia.com/gpu
value: "true"
effect: NoSchedule
---
# GPU资源配额
apiVersion: v1
kind: ResourceQuota
metadata:
name: gpu-quota
namespace: ai-workloads
spec:
hard:
requests.nvidia.com/gpu: "16"
limits.nvidia.com/gpu: "16"

平台服务层

from fastapi import FastAPI, Depends, HTTPException
from typing import Optional
import asyncio

app = FastAPI()

class AIPlatformService:
def __init__(self):
self.k8s_client = KubernetesClient()
self.model_registry = ModelRegistry()
self.resource_manager = ResourceManager()
self.job_scheduler = JobScheduler()

async def create_training_job(
self,
job_spec: TrainingJobSpec,
user_id: str
):
"""创建训练任务"""
# 1. 验证资源配额
quota_check = await self.resource_manager.check_quota(
user_id,
job_spec.resource_requirements
)

if not quota_check.available:
raise HTTPException(
status_code=403,
detail="资源配额不足"
)

# 2. 选择最优节点
nodes = await self.job_scheduler.select_nodes(
gpu_type=job_spec.gpu_type,
gpu_count=job_spec.gpu_count
)

# 3. 创建训练任务
job = await self.k8s_client.create_training_job(
name=job_spec.name,
image=job_spec.image,
command=job_spec.command,
resources=job_spec.resources,
node_selector=nodes
)

# 4. 记录审计日志
await self.audit_logger.log({
'user_id': user_id,
'action': 'create_training_job',
'job_name': job.name,
'timestamp': datetime.now()
})

return {
'job_id': job.name,
'status': 'Created',
'estimated_start_time': job.estimated_start
}

@app.post("/api/v1/training/jobs")
async def create_job(
job_spec: TrainingJobSpec,
user_id: str = Depends(get_current_user)
):
"""创建训练任务API"""
service = AIPlatformService()
result = await service.create_training_job(job_spec, user_id)
return result

2. 模型服务架构

多模型服务编排

class MultiModelOrchestrator:
def __init__(self):
self.model_store = ModelStore()
self.load_balancer = LoadBalancer()
self.cache_manager = CacheManager()

async def deploy_model_pipeline(
self,
pipeline_config: PipelineConfig
):
"""部署模型流水线"""
services = []

# 按顺序部署模型服务
for stage in pipeline_config.stages:
service = await self.deploy_single_model(
model_name=stage.model_name,
version=stage.version,
resources=stage.resources
)

services.append(service)

# 配置服务编排
pipeline = await self.configure_pipeline(
services,
pipeline_config
)

return pipeline

async def deploy_single_model(
self,
model_name: str,
version: str,
resources: dict
):
"""部署单个模型服务"""
# 1. 从模型仓库拉取模型
model_artifact = await self.model_store.get_model(
model_name, version
)

# 2. 创建InferenceService
inference_service = {
'apiVersion': 'serving.kserve.io/v1beta1',
'kind': 'InferenceService',
'metadata': {
'name': f'{model_name}-{version}',
'namespace': 'model-serving'
},
'spec': {
'predictor': {
'model': {
'modelFormat': {
'name': model_artifact.framework
},
'storageUri': model_artifact.uri,
'resources': resources,
'runtime': self.select_runtime(
model_artifact.framework
)
}
},
'transformer': {
'containers': [{
'name': 'transformer',
'image': 'transformer:latest',
'env': [{
'name': 'MODEL_NAME',
'value': model_name
}]
}]
}
}
}

# 3. 部署到K8s
await self.k8s_client.create_inference_service(
inference_service
)

# 4. 等待服务就绪
await self.wait_for_ready(
f'{model_name}-{version}',
timeout=300
)

return inference_service

A/B测试和灰度发布

class TrafficSplitter:
def __init__(self):
self.istio_client = IstioClient()

def create_ab_test(
self,
service_name: str,
model_a: str,
model_b: str,
traffic_split: dict
):
"""创建A/B测试"""
# 创建VirtualService
virtual_service = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'VirtualService',
'metadata': {
'name': f'{service_name}-ab-test'
},
'spec': {
'hosts': [service_name],
'http': [{
'match': [{
'headers': {
'x-user-group': {
'exact': 'beta'
}
}
}],
'route': [{
'destination': {
'host': service_name,
'subset': model_b
}
}]
}, {
'route': [
{
'destination': {
'host': service_name,
'subset': model_a
},
'weight': traffic_split['model_a']
},
{
'destination': {
'host': service_name,
'subset': model_b
},
'weight': traffic_split['model_b']
}
]
}]
}
}

# 创建DestinationRule
destination_rule = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'DestinationRule',
'metadata': {
'name': f'{service_name}-versions'
},
'spec': {
'host': service_name,
'subsets': [
{
'name': model_a,
'labels': {'version': model_a}
},
{
'name': model_b,
'labels': {'version': model_b}
}
]
}
}

self.istio_client.apply(virtual_service)
self.istio_client.apply(destination_rule)

def gradual_rollout(
self,
service_name: str,
new_version: str,
stages: list
):
"""渐进式发布"""
for stage in stages:
# 更新流量分配
self.update_traffic_split(
service_name,
{
'old': 100 - stage['percentage'],
'new': stage['percentage']
}
)

# 监控指标
metrics = self.monitor_metrics(
service_name,
duration=stage['duration']
)

# 检查是否符合SLO
if not self.check_slo(metrics, stage['slo']):
# 回滚
self.rollback(service_name)
raise Exception('SLO检查失败,自动回滚')

# 等待下一阶段
time.sleep(stage['wait_time'])

3. 分布式训练架构

多GPU训练调度

class DistributedTrainingScheduler:
def __init__(self):
self.gpu_topology = GPUTopologyManager()
self.gang_scheduler = GangScheduler()

def schedule_distributed_training(
self,
job_spec: DistributedJobSpec
):
"""调度分布式训练任务"""
# 1. 计算所需GPU数量
total_gpus = (
job_spec.num_workers *
job_spec.gpus_per_worker
)

# 2. 选择GPU拓扑
topology = self.gpu_topology.find_optimal_topology(
total_gpus,
communication_pattern=job_spec.comm_pattern
)

# 3. Gang调度(全部Pod同时启动)
pod_group = self.gang_scheduler.create_pod_group(
name=job_spec.name,
min_member=job_spec.num_workers,
queue=job_spec.queue,
priority=job_spec.priority
)

# 4. 创建分布式训练任务
pytorch_job = {
'apiVersion': 'kubeflow.org/v1',
'kind': 'PyTorchJob',
'metadata': {
'name': job_spec.name,
'annotations': {
'scheduling.volcano.sh/group-name': pod_group
}
},
'spec': {
'pytorchReplicaSpecs': {
'Master': self.create_replica_spec(
role='master',
replicas=1,
gpu_per_replica=job_spec.gpus_per_worker,
node_affinity=topology['master_nodes']
),
'Worker': self.create_replica_spec(
role='worker',
replicas=job_spec.num_workers - 1,
gpu_per_replica=job_spec.gpus_per_worker,
node_affinity=topology['worker_nodes']
)
}
}
}

return pytorch_job

弹性训练

class ElasticTraining:
def __init__(self):
self.checkpoint_manager = CheckpointManager()

def enable_elastic_training(
self,
job_name: str,
min_workers: int,
max_workers: int
):
"""启用弹性训练"""
elastic_config = {
'apiVersion': 'elastic.pytorch.org/v1alpha1',
'kind': 'ElasticJob',
'metadata': {
'name': job_name
},
'spec': {
'minReplicas': min_workers,
'maxReplicas': max_workers,
'rdzvBackend': 'etcd',
'rdzvEndpoint': 'etcd-service:2379',
'replicaSpecs': {
'Worker': {
'replicas': max_workers,
'restartPolicy': 'OnFailure',
'template': {
'spec': {
'containers': [{
'name': 'pytorch',
'image': 'elastic-training:latest',
'command': [
'torchrun',
'--nnodes', f'{min_workers}:{max_workers}',
'--nproc_per_node', '8',
'--rdzv_backend', 'etcd',
'--rdzv_endpoint', 'etcd-service:2379',
'train.py'
]
}]
}
}
}
}
}
}

return elastic_config

4. 存储架构

分层存储策略

class TieredStorage:
def __init__(self):
self.hot_storage = MinIOClient() # 对象存储
self.warm_storage = CephClient() # 块存储
self.cold_storage = S3Client() # 归档存储

async def store_model_artifact(
self,
model_id: str,
artifact_data: bytes,
metadata: dict
):
"""存储模型工件"""
# 1. 存储到热存储(快速访问)
hot_key = f'models/{model_id}/latest'
await self.hot_storage.put_object(
bucket='model-artifacts',
key=hot_key,
data=artifact_data
)

# 2. 异步同步到温存储
asyncio.create_task(
self.sync_to_warm_storage(
model_id, artifact_data
)
)

# 3. 设置生命周期策略
await self.set_lifecycle_policy(
model_id,
hot_retention_days=7,
warm_retention_days=30
)

return hot_key

async def get_model_artifact(self, model_id: str):
"""获取模型工件"""
# 1. 尝试从热存储获取
try:
return await self.hot_storage.get_object(
bucket='model-artifacts',
key=f'models/{model_id}/latest'
)
except NotFoundError:
pass

# 2. 从温存储获取
try:
data = await self.warm_storage.get_object(
f'models/{model_id}'
)
# 提升到热存储
await self.promote_to_hot(model_id, data)
return data
except NotFoundError:
pass

# 3. 从冷存储恢复
data = await self.cold_storage.restore(
f'archive/models/{model_id}'
)
await self.promote_to_hot(model_id, data)
return data

数据缓存层

class DataCacheLayer:
def __init__(self):
self.redis_cache = Redis()
self.alluxio_cache = AlluxioClient()

def cache_training_data(
self,
dataset_id: str,
data_path: str
):
"""缓存训练数据"""
# 使用Alluxio作为数据缓存层
cache_path = f'/alluxio/datasets/{dataset_id}'

# 配置缓存策略
self.alluxio_cache.load_metadata(
path=data_path,
recursive=True
)

# 预加载到缓存
self.alluxio_cache.distribute_load(
path=data_path,
replication=2
)

return cache_path

5. 网络架构

服务网格配置

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-inference-service
spec:
hosts:
- inference.ai-platform.com
gateways:
- ai-gateway
http:
- match:
- headers:
x-model-version:
exact: v2
route:
- destination:
host: model-service
subset: v2
weight: 100
timeout: 30s
retries:
attempts: 3
perTryTimeout: 10s
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: model-service-circuit-breaker
spec:
host: model-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
http2MaxRequests: 100
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50

GPU直连网络

class GPUDirectRDMA:
def configure_rdma_network(self, pods: list):
"""配置GPU Direct RDMA网络"""
network_attachment = {
'apiVersion': 'k8s.cni.cncf.io/v1',
'kind': 'NetworkAttachmentDefinition',
'metadata': {
'name': 'rdma-network'
},
'spec': {
'config': json.dumps({
'cniVersion': '0.3.1',
'name': 'rdma-network',
'type': 'macvlan',
'master': 'eth1',
'mode': 'bridge',
'ipam': {
'type': 'host-local',
'subnet': '192.168.100.0/24'
}
})
}
}

# 为Pod添加RDMA网络
for pod in pods:
pod['metadata']['annotations'] = {
'k8s.v1.cni.cncf.io/networks': 'rdma-network'
}
pod['spec']['containers'][0]['resources']['limits'].update({
'rdma/rdma_shared_device_a': 1
})

return network_attachment

6. 安全架构

多租户隔离

class MultiTenancyManager:
def __init__(self):
self.namespace_manager = NamespaceManager()
self.rbac_manager = RBACManager()
self.network_policy_manager = NetworkPolicyManager()

def create_tenant(
self,
tenant_id: str,
quota: dict
):
"""创建租户"""
# 1. 创建命名空间
namespace = self.namespace_manager.create_namespace(
name=f'tenant-{tenant_id}',
labels={
'tenant-id': tenant_id,
'isolation': 'strict'
}
)

# 2. 设置资源配额
resource_quota = {
'apiVersion': 'v1',
'kind': 'ResourceQuota',
'metadata': {
'name': 'tenant-quota',
'namespace': namespace
},
'spec': {
'hard': {
'requests.cpu': quota['cpu'],
'requests.memory': quota['memory'],
'requests.nvidia.com/gpu': str(quota['gpu']),
'persistentvolumeclaims': str(quota['storage_claims'])
}
}
}

# 3. 配置网络隔离
network_policy = {
'apiVersion': 'networking.k8s.io/v1',
'kind': 'NetworkPolicy',
'metadata': {
'name': 'tenant-isolation',
'namespace': namespace
},
'spec': {
'podSelector': {},
'policyTypes': ['Ingress', 'Egress'],
'ingress': [{
'from': [{
'namespaceSelector': {
'matchLabels': {
'tenant-id': tenant_id
}
}
}]
}],
'egress': [{
'to': [{
'namespaceSelector': {
'matchLabels': {
'tenant-id': tenant_id
}
}
}]
}]
}
}

# 4. 创建RBAC
role_binding = self.rbac_manager.create_role_binding(
tenant_id, namespace
)

return {
'namespace': namespace,
'quota': resource_quota,
'network_policy': network_policy,
'rbac': role_binding
}

模型安全

class ModelSecurity:
def __init__(self):
self.crypto = CryptoManager()
self.scanner = VulnerabilityScanner()

async def secure_model(self, model_path: str):
"""保护模型安全"""
# 1. 扫描模型文件
scan_result = await self.scanner.scan_model(model_path)

if scan_result.has_vulnerabilities:
raise SecurityError(
f"模型包含漏洞: {scan_result.vulnerabilities}"
)

# 2. 加密模型
encrypted_model = self.crypto.encrypt_file(
model_path,
algorithm='AES-256-GCM'
)

# 3. 生成签名
signature = self.crypto.sign_file(
encrypted_model,
private_key=self.get_private_key()
)

return {
'encrypted_path': encrypted_model,
'signature': signature,
'scan_report': scan_result
}

7. 成本优化

Spot实例调度

class SpotInstanceScheduler:
def __init__(self):
self.spot_advisor = SpotAdvisor()
self.checkpoint_manager = CheckpointManager()

def schedule_on_spot(
self,
job_spec: JobSpec,
checkpoint_interval: int = 300
):
"""在Spot实例上调度任务"""
# 1. 选择最便宜的Spot实例
spot_options = self.spot_advisor.get_best_spot_options(
gpu_type=job_spec.gpu_type,
region=job_spec.region
)

# 2. 配置容错
tolerations = [{
'key': 'cloud.google.com/gke-preemptible',
'operator': 'Equal',
'value': 'true',
'effect': 'NoSchedule'
}]

# 3. 配置检查点
job_spec.add_checkpoint_config({
'enabled': True,
'interval': checkpoint_interval,
'storage': 'persistent-volume'
})

# 4. 添加中断处理器
job_spec.add_preemption_handler(
self.handle_preemption
)

return job_spec

async def handle_preemption(self, job_name: str):
"""处理实例抢占"""
# 保存检查点
await self.checkpoint_manager.save(job_name)

# 重新调度到新节点
await self.reschedule_job(job_name)

总结

一个生产级的AI云原生平台需要:

弹性架构:自动扩缩容和资源优化
高可用性:多副本和故障恢复
安全隔离:多租户和网络隔离
可观测性:全链路监控和追踪
成本优化:Spot实例和资源复用
自动化运维:CI/CD和自愈能力

通过云原生架构,AI应用可以获得企业级的可靠性和扩展性。