模型并行、数据并行

预计学习时间:40分钟

并行化技术是训练大规模语言模型的核心,通过在多个计算设备上分配计算和数据,实现超大规模模型的高效训练。

并行训练的必要性

随着模型规模不断增长,单设备训练面临严峻挑战:

  • 内存不足:单GPU无法容纳上百亿参数的模型
  • 训练时间过长:单设备训练千亿参数模型需要数年时间
  • 扩展性受限:难以通过增加计算资源线性加速训练

GPT-3(1750亿参数)模型占用约700GB参数存储空间,而单个A100 GPU仅有80GB显存,分布式并行训练成为必然选择。

数据并行

数据并行原理

数据并行将数据分割到多个计算设备,每个设备拥有完整模型副本:

# PyTorch DDP (DistributedDataParallel) 实现
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP

def setup_ddp(rank, world_size):
    # 初始化进程组
    dist.init_process_group(
        backend="nccl", 
        init_method="tcp://localhost:12345",
        rank=rank, 
        world_size=world_size
    )

def train_with_ddp(rank, world_size, model, train_dataset):
    # 设置分布式环境
    setup_ddp(rank, world_size)
    
    # 将模型移至当前设备
    device = torch.device(f"cuda:{rank}")
    model = model.to(device)
    
    # 包装为DDP模型
    ddp_model = DDP(model, device_ids=[rank])
    
    # 创建数据加载器(自动分片数据)
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, 
        num_replicas=world_size, 
        rank=rank
    )
    train_loader = torch.utils.data.DataLoader(
        train_dataset, 
        batch_size=32,
        sampler=train_sampler
    )
    
    # 训练循环
    optimizer = torch.optim.Adam(ddp_model.parameters(), lr=1e-4)
    for epoch in range(num_epochs):
        train_sampler.set_epoch(epoch)  # 确保每个epoch数据顺序不同
        
        for batch in train_loader:
            inputs, targets = batch
            inputs, targets = inputs.to(device), targets.to(device)
            
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()

数据并行通信过程

数据并行通信示意图

数据并行训练的主要步骤:

  1. 将训练数据拆分到N个设备
  2. 每个设备使用本地数据批次进行前向计算
  3. 计算本地梯度
  4. 通过all-reduce通信同步梯度
  5. 各设备使用相同梯度更新模型

数据并行变体

分布式数据并行 (DDP)

  • 每个进程独立训练,通过集合通信同步梯度
  • 比传统DP更高效,是当前实践标准

ZeRO数据并行 (ZeRO-DP)

  • 在设备间分割优化器状态、梯度和参数
  • 大幅减少内存占用,无需复制完整模型
# DeepSpeed ZeRO 实现示例
import deepspeed

# ZeRO配置
zero_config = {
    "zero_optimization": {
        "stage": 3,                # 使用ZeRO阶段3,参数也分片
        "offload_optimizer": {     # 优化器状态卸载到CPU
            "device": "cpu"
        },
        "offload_param": {         # 参数卸载到CPU
            "device": "cpu"
        },
        "overlap_comm": True,      # 通信与计算重叠
        "contiguous_gradients": True,  # 连续梯度缓冲区
        "allgather_bucket_size": 5e8   # all-gather操作分桶大小
    },
    "fp16": {
        "enabled": True            # 启用半精度训练
    }
}

# 初始化DeepSpeed模型
model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    config=zero_config
)

# 训练循环
for batch in train_loader:
    inputs, targets = batch
    outputs = model_engine(inputs)
    loss = loss_fn(outputs, targets)
    model_engine.backward(loss)
    model_engine.step()

模型并行

模型并行原理

模型并行将模型分割到多个计算设备,每个设备仅持有部分模型:

# 简单的模型并行实现
class ModelParallelBlock(nn.Module):
    def __init__(self, first_layer, second_layer, split_devices):
        super().__init__()
        self.first_device, self.second_device = split_devices
        self.first_layer = first_layer.to(self.first_device)
        self.second_layer = second_layer.to(self.second_device)
    
    def forward(self, x):
        # 输入在第一个设备
        x = x.to(self.first_device)
        # 第一层计算
        x = self.first_layer(x)
        # 将结果传输到第二个设备
        x = x.to(self.second_device)
        # 第二层计算
        return self.second_layer(x)

# 使用示例
model = ModelParallelBlock(
    nn.Linear(1024, 4096),
    nn.Linear(4096, 1024),
    ["cuda:0", "cuda:1"]
)

模型并行类型

张量并行 (Tensor Parallelism)

在张量级别分割模型,如将注意力头或隐藏维度在设备间拆分:

# Megatron-LM张量并行示例(伪代码)
class ParallelSelfAttention(nn.Module):
    def __init__(self, hidden_size, num_heads, world_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.world_size = world_size
        
        # 每个设备只负责部分注意力头
        self.num_heads_per_partition = num_heads // world_size
        
        # 分割查询、键、值投影
        self.query = nn.Linear(hidden_size, hidden_size // world_size)
        self.key = nn.Linear(hidden_size, hidden_size // world_size)
        self.value = nn.Linear(hidden_size, hidden_size // world_size)
    
    def forward(self, hidden_states):
        # 本地计算查询、键、值
        local_q = self.query(hidden_states)
        local_k = self.key(hidden_states)
        local_v = self.value(hidden_states)
        
        # 计算本地注意力分数
        local_attention_scores = torch.matmul(local_q, local_k.transpose(-1, -2))
        local_attention_probs = F.softmax(local_attention_scores, dim=-1)
        
        # 计算本地输出
        local_output = torch.matmul(local_attention_probs, local_v)
        
        # 使用all-gather收集各设备输出
        outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
        torch.distributed.all_gather(outputs, local_output)
        
        # 合并结果
        output = torch.cat(outputs, dim=-1)
        return output

张量并行示意图

流水线并行 (Pipeline Parallelism)

将模型按层分割,不同层在不同设备上执行:

# 使用PyTorch流水线并行
from torch.distributed.pipeline.sync import Pipe

# 将模型分割为4段
partitions = [
    nn.Sequential(model.layers[0], model.layers[1]),
    nn.Sequential(model.layers[2], model.layers[3]),
    nn.Sequential(model.layers[4], model.layers[5]),
    nn.Sequential(model.layers[6], model.layers[7])
]

# 将各部分放到不同设备
devices = ["cuda:0", "cuda:1", "cuda:2", "cuda:3"]
for i, partition in enumerate(partitions):
    partition.to(devices[i])

# 创建流水线模型
pipe_model = Pipe(
    nn.Sequential(*partitions),
    chunks=8  # 将批次分成8个微批次
)

# 训练
output = pipe_model(input_data)
loss = loss_fn(output, target)
loss.backward()
optimizer.step()

专家并行 (Expert Parallelism)

用于稀疏混合专家模型(MoE),不同专家分布在不同设备上:

# 简化的MoE并行实现
class ParallelMoELayer(nn.Module):
    def __init__(self, input_size, expert_size, num_experts, world_size):
        super().__init__()
        self.input_size = input_size
        self.num_experts = num_experts
        self.world_size = world_size
        
        # 每个设备负责部分专家
        self.num_local_experts = num_experts // world_size
        
        # 创建专家网络
        self.experts = nn.ModuleList([
            nn.Sequential(
                nn.Linear(input_size, expert_size),
                nn.GELU(),
                nn.Linear(expert_size, input_size)
            ) for _ in range(self.num_local_experts)
        ])
        
        # 路由网络
        self.router = nn.Linear(input_size, num_experts)
    
    def forward(self, x):
        # 计算路由权重
        routing_weights = F.softmax(self.router(x), dim=-1)
        
        # 获取本地专家的路由权重
        local_indices = torch.arange(
            self.rank * self.num_local_experts,
            (self.rank + 1) * self.num_local_experts
        )
        local_routing_weights = routing_weights[:, local_indices]
        
        # 计算本地专家输出
        local_outputs = torch.zeros_like(x)
        for i, expert in enumerate(self.experts):
            # 选择路由到该专家的输入
            expert_input = x * local_routing_weights[:, i].unsqueeze(-1)
            # 计算专家输出
            expert_output = expert(expert_input)
            local_outputs += expert_output
        
        # 汇总所有设备的输出
        global_output = torch.zeros_like(local_outputs)
        torch.distributed.all_reduce(global_output, op=torch.distributed.ReduceOp.SUM)
        
        return global_output

3D并行

现代超大规模模型通常结合多种并行策略,形成3D并行:

数据并行 × 张量并行 × 流水线并行 = 3D并行

优势:

  • 最大化利用可用硬件
  • 克服单一并行策略的限制
  • 实现万亿参数级模型训练
# 使用DeepSpeed实现3D并行训练(简化示例)
ds_config = {
    # ZeRO数据并行
    "zero_optimization": {
        "stage": 1
    },
    
    # 张量并行
    "tensor_parallel": {
        "enabled": True,
        "tp_size": 4  # 4-way张量并行
    },
    
    # 流水线并行
    "pipeline_parallel": {
        "enabled": True,
        "pp_size": 8,  # 8-way流水线并行
        "microbatches": 16
    },
    
    # 混合精度
    "fp16": {
        "enabled": True
    }
}

# 初始化DeepSpeed引擎
model_engine, _, _, _ = deepspeed.initialize(
    model=model,
    config=ds_config
)

# 训练循环
for batch in train_loader:
    model_engine.train_batch()

并行策略选择与实践

选择合适的并行策略

并行策略适用场景通信开销实现复杂度内存效率
数据并行中小型模型,大量数据高(每批次同步全部梯度)低(需完整模型副本)
ZeRO并行中大型模型,通用场景中(部分参数通信)高(参数分片存储)
张量并行大型模型,需降低通信低至中(层内通信)中(张量分片)
流水线并行特大型模型,层数多低(仅微批次边界通信)高(层间分割)
3D并行超大型模型,大规模集群因配置而异极高极高

实践中的挑战与解决方案

  1. 负载不均衡

    • 问题:不同并行部分计算负载不同
    • 解决:动态负载均衡,智能模型分割
  2. 通信瓶颈

    • 问题:频繁通信导致训练速度下降
    • 解决:梯度累积,通信计算重叠,拓扑感知通信
  3. 内存管理

    • 问题:复杂并行模式下内存泄漏和碎片化
    • 解决:内存高效算法,梯度检查点,特定并行场景优化
  4. 扩展效率下降

    • 问题:设备数量增加时扩展效率非线性下降
    • 解决:优化通信策略,减少全局同步点
# 优化通信与计算重叠示例
class OptimizedDDP(torch.nn.Module):
    def __init__(self, module, broadcast_buffers=True):
        super().__init__()
        self.module = module
        self.broadcast_buffers = broadcast_buffers
        
        # 为每个参数组初始化通信缓冲区
        self.buckets = {}
        self._register_hooks()
    
    def _register_hooks(self):
        # 按大小将参数分组
        param_groups = self._group_parameters_by_size()
        
        for group_id, params in param_groups.items():
            # 创建梯度累积缓冲区
            buffer = torch.zeros(sum(p.numel() for p in params), 
                                device=params[0].device)
            self.buckets[group_id] = {
                "buffer": buffer,
                "params": params,
                "ready": False
            }
            
            # 注册梯度挂钩
            for param in params:
                param.register_hook(
                    lambda grad, gid=group_id: self._grad_hook(grad, gid)
                )
    
    def _grad_hook(self, grad, group_id):
        # 累积梯度到缓冲区
        bucket = self.buckets[group_id]
        # ... (填充缓冲区代码)
        
        # 检查是否所有梯度都准备好
        if all(p.grad is not None for p in bucket["params"]):
            # 启动异步all-reduce
            torch.distributed.all_reduce(
                bucket["buffer"], 
                async_op=True
            )
            bucket["ready"] = True
    
    def forward(self, *args, **kwargs):
        return self.module(*args, **kwargs)

案例研究:大型语言模型的并行训练

GPT-3训练架构

OpenAI在训练1750亿参数GPT-3时采用的并行策略:

  • 模型并行度:8-way张量并行
  • 流水线并行度:64-way流水线分割
  • 数据并行度:根据集群规模动态调整
  • 总计:使用数千个GPU,数月训练时间

LLaMA训练优化

Meta在LLaMA训练中的并行技术改进:

  • 优化的全渗透张量并行
  • 分片优化器状态
  • 异步梯度预取
  • 训练提速达90%线性扩展效率

Megatron-Turing NLG模型

微软和NVIDIA合作训练5300亿参数模型的关键并行技术:

  • 3D并行架构
  • 280个DGX A100节点
  • ZeRO-1优化数据并行
  • 2D张量-流水线分割策略

小结

并行化是训练超大规模语言模型的关键技术:

  1. 数据并行:复制整个模型,分割数据,适合中小型模型
  2. 模型并行:分割模型,包括张量并行和流水线并行
  3. 3D并行:结合多种并行策略,实现万亿参数级训练
  4. 工程实践:选择合适的并行策略组合,根据硬件和模型特点调整

下一节我们将介绍混合精度训练技术,这是另一种提升大模型训练效率的重要方法。