模型并行、数据并行
预计学习时间:40分钟
并行化技术是训练大规模语言模型的核心,通过在多个计算设备上分配计算和数据,实现超大规模模型的高效训练。
并行训练的必要性
随着模型规模不断增长,单设备训练面临严峻挑战:
- 内存不足:单GPU无法容纳上百亿参数的模型
- 训练时间过长:单设备训练千亿参数模型需要数年时间
- 扩展性受限:难以通过增加计算资源线性加速训练
GPT-3(1750亿参数)模型占用约700GB参数存储空间,而单个A100 GPU仅有80GB显存,分布式并行训练成为必然选择。
数据并行
数据并行原理
数据并行将数据分割到多个计算设备,每个设备拥有完整模型副本:
# PyTorch DDP (DistributedDataParallel) 实现
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_ddp(rank, world_size):
# 初始化进程组
dist.init_process_group(
backend="nccl",
init_method="tcp://localhost:12345",
rank=rank,
world_size=world_size
)
def train_with_ddp(rank, world_size, model, train_dataset):
# 设置分布式环境
setup_ddp(rank, world_size)
# 将模型移至当前设备
device = torch.device(f"cuda:{rank}")
model = model.to(device)
# 包装为DDP模型
ddp_model = DDP(model, device_ids=[rank])
# 创建数据加载器(自动分片数据)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank
)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=32,
sampler=train_sampler
)
# 训练循环
optimizer = torch.optim.Adam(ddp_model.parameters(), lr=1e-4)
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch) # 确保每个epoch数据顺序不同
for batch in train_loader:
inputs, targets = batch
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = loss_fn(outputs, targets)
loss.backward()
optimizer.step()
数据并行通信过程
数据并行训练的主要步骤:
- 将训练数据拆分到N个设备
- 每个设备使用本地数据批次进行前向计算
- 计算本地梯度
- 通过all-reduce通信同步梯度
- 各设备使用相同梯度更新模型
数据并行变体
分布式数据并行 (DDP)
- 每个进程独立训练,通过集合通信同步梯度
- 比传统DP更高效,是当前实践标准
ZeRO数据并行 (ZeRO-DP)
- 在设备间分割优化器状态、梯度和参数
- 大幅减少内存占用,无需复制完整模型
# DeepSpeed ZeRO 实现示例
import deepspeed
# ZeRO配置
zero_config = {
"zero_optimization": {
"stage": 3, # 使用ZeRO阶段3,参数也分片
"offload_optimizer": { # 优化器状态卸载到CPU
"device": "cpu"
},
"offload_param": { # 参数卸载到CPU
"device": "cpu"
},
"overlap_comm": True, # 通信与计算重叠
"contiguous_gradients": True, # 连续梯度缓冲区
"allgather_bucket_size": 5e8 # all-gather操作分桶大小
},
"fp16": {
"enabled": True # 启用半精度训练
}
}
# 初始化DeepSpeed模型
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config=zero_config
)
# 训练循环
for batch in train_loader:
inputs, targets = batch
outputs = model_engine(inputs)
loss = loss_fn(outputs, targets)
model_engine.backward(loss)
model_engine.step()
模型并行
模型并行原理
模型并行将模型分割到多个计算设备,每个设备仅持有部分模型:
# 简单的模型并行实现
class ModelParallelBlock(nn.Module):
def __init__(self, first_layer, second_layer, split_devices):
super().__init__()
self.first_device, self.second_device = split_devices
self.first_layer = first_layer.to(self.first_device)
self.second_layer = second_layer.to(self.second_device)
def forward(self, x):
# 输入在第一个设备
x = x.to(self.first_device)
# 第一层计算
x = self.first_layer(x)
# 将结果传输到第二个设备
x = x.to(self.second_device)
# 第二层计算
return self.second_layer(x)
# 使用示例
model = ModelParallelBlock(
nn.Linear(1024, 4096),
nn.Linear(4096, 1024),
["cuda:0", "cuda:1"]
)
模型并行类型
张量并行 (Tensor Parallelism)
在张量级别分割模型,如将注意力头或隐藏维度在设备间拆分:
# Megatron-LM张量并行示例(伪代码)
class ParallelSelfAttention(nn.Module):
def __init__(self, hidden_size, num_heads, world_size):
super().__init__()
self.hidden_size = hidden_size
self.num_heads = num_heads
self.world_size = world_size
# 每个设备只负责部分注意力头
self.num_heads_per_partition = num_heads // world_size
# 分割查询、键、值投影
self.query = nn.Linear(hidden_size, hidden_size // world_size)
self.key = nn.Linear(hidden_size, hidden_size // world_size)
self.value = nn.Linear(hidden_size, hidden_size // world_size)
def forward(self, hidden_states):
# 本地计算查询、键、值
local_q = self.query(hidden_states)
local_k = self.key(hidden_states)
local_v = self.value(hidden_states)
# 计算本地注意力分数
local_attention_scores = torch.matmul(local_q, local_k.transpose(-1, -2))
local_attention_probs = F.softmax(local_attention_scores, dim=-1)
# 计算本地输出
local_output = torch.matmul(local_attention_probs, local_v)
# 使用all-gather收集各设备输出
outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
torch.distributed.all_gather(outputs, local_output)
# 合并结果
output = torch.cat(outputs, dim=-1)
return output
流水线并行 (Pipeline Parallelism)
将模型按层分割,不同层在不同设备上执行:
# 使用PyTorch流水线并行
from torch.distributed.pipeline.sync import Pipe
# 将模型分割为4段
partitions = [
nn.Sequential(model.layers[0], model.layers[1]),
nn.Sequential(model.layers[2], model.layers[3]),
nn.Sequential(model.layers[4], model.layers[5]),
nn.Sequential(model.layers[6], model.layers[7])
]
# 将各部分放到不同设备
devices = ["cuda:0", "cuda:1", "cuda:2", "cuda:3"]
for i, partition in enumerate(partitions):
partition.to(devices[i])
# 创建流水线模型
pipe_model = Pipe(
nn.Sequential(*partitions),
chunks=8 # 将批次分成8个微批次
)
# 训练
output = pipe_model(input_data)
loss = loss_fn(output, target)
loss.backward()
optimizer.step()
专家并行 (Expert Parallelism)
用于稀疏混合专家模型(MoE),不同专家分布在不同设备上:
# 简化的MoE并行实现
class ParallelMoELayer(nn.Module):
def __init__(self, input_size, expert_size, num_experts, world_size):
super().__init__()
self.input_size = input_size
self.num_experts = num_experts
self.world_size = world_size
# 每个设备负责部分专家
self.num_local_experts = num_experts // world_size
# 创建专家网络
self.experts = nn.ModuleList([
nn.Sequential(
nn.Linear(input_size, expert_size),
nn.GELU(),
nn.Linear(expert_size, input_size)
) for _ in range(self.num_local_experts)
])
# 路由网络
self.router = nn.Linear(input_size, num_experts)
def forward(self, x):
# 计算路由权重
routing_weights = F.softmax(self.router(x), dim=-1)
# 获取本地专家的路由权重
local_indices = torch.arange(
self.rank * self.num_local_experts,
(self.rank + 1) * self.num_local_experts
)
local_routing_weights = routing_weights[:, local_indices]
# 计算本地专家输出
local_outputs = torch.zeros_like(x)
for i, expert in enumerate(self.experts):
# 选择路由到该专家的输入
expert_input = x * local_routing_weights[:, i].unsqueeze(-1)
# 计算专家输出
expert_output = expert(expert_input)
local_outputs += expert_output
# 汇总所有设备的输出
global_output = torch.zeros_like(local_outputs)
torch.distributed.all_reduce(global_output, op=torch.distributed.ReduceOp.SUM)
return global_output
3D并行
现代超大规模模型通常结合多种并行策略,形成3D并行:
数据并行 × 张量并行 × 流水线并行 = 3D并行
优势:
- 最大化利用可用硬件
- 克服单一并行策略的限制
- 实现万亿参数级模型训练
# 使用DeepSpeed实现3D并行训练(简化示例)
ds_config = {
# ZeRO数据并行
"zero_optimization": {
"stage": 1
},
# 张量并行
"tensor_parallel": {
"enabled": True,
"tp_size": 4 # 4-way张量并行
},
# 流水线并行
"pipeline_parallel": {
"enabled": True,
"pp_size": 8, # 8-way流水线并行
"microbatches": 16
},
# 混合精度
"fp16": {
"enabled": True
}
}
# 初始化DeepSpeed引擎
model_engine, _, _, _ = deepspeed.initialize(
model=model,
config=ds_config
)
# 训练循环
for batch in train_loader:
model_engine.train_batch()
并行策略选择与实践
选择合适的并行策略
并行策略 | 适用场景 | 通信开销 | 实现复杂度 | 内存效率 |
---|---|---|---|---|
数据并行 | 中小型模型,大量数据 | 高(每批次同步全部梯度) | 低 | 低(需完整模型副本) |
ZeRO并行 | 中大型模型,通用场景 | 中(部分参数通信) | 中 | 高(参数分片存储) |
张量并行 | 大型模型,需降低通信 | 低至中(层内通信) | 高 | 中(张量分片) |
流水线并行 | 特大型模型,层数多 | 低(仅微批次边界通信) | 高 | 高(层间分割) |
3D并行 | 超大型模型,大规模集群 | 因配置而异 | 极高 | 极高 |
实践中的挑战与解决方案
-
负载不均衡
- 问题:不同并行部分计算负载不同
- 解决:动态负载均衡,智能模型分割
-
通信瓶颈
- 问题:频繁通信导致训练速度下降
- 解决:梯度累积,通信计算重叠,拓扑感知通信
-
内存管理
- 问题:复杂并行模式下内存泄漏和碎片化
- 解决:内存高效算法,梯度检查点,特定并行场景优化
-
扩展效率下降
- 问题:设备数量增加时扩展效率非线性下降
- 解决:优化通信策略,减少全局同步点
# 优化通信与计算重叠示例
class OptimizedDDP(torch.nn.Module):
def __init__(self, module, broadcast_buffers=True):
super().__init__()
self.module = module
self.broadcast_buffers = broadcast_buffers
# 为每个参数组初始化通信缓冲区
self.buckets = {}
self._register_hooks()
def _register_hooks(self):
# 按大小将参数分组
param_groups = self._group_parameters_by_size()
for group_id, params in param_groups.items():
# 创建梯度累积缓冲区
buffer = torch.zeros(sum(p.numel() for p in params),
device=params[0].device)
self.buckets[group_id] = {
"buffer": buffer,
"params": params,
"ready": False
}
# 注册梯度挂钩
for param in params:
param.register_hook(
lambda grad, gid=group_id: self._grad_hook(grad, gid)
)
def _grad_hook(self, grad, group_id):
# 累积梯度到缓冲区
bucket = self.buckets[group_id]
# ... (填充缓冲区代码)
# 检查是否所有梯度都准备好
if all(p.grad is not None for p in bucket["params"]):
# 启动异步all-reduce
torch.distributed.all_reduce(
bucket["buffer"],
async_op=True
)
bucket["ready"] = True
def forward(self, *args, **kwargs):
return self.module(*args, **kwargs)
案例研究:大型语言模型的并行训练
GPT-3训练架构
OpenAI在训练1750亿参数GPT-3时采用的并行策略:
- 模型并行度:8-way张量并行
- 流水线并行度:64-way流水线分割
- 数据并行度:根据集群规模动态调整
- 总计:使用数千个GPU,数月训练时间
LLaMA训练优化
Meta在LLaMA训练中的并行技术改进:
- 优化的全渗透张量并行
- 分片优化器状态
- 异步梯度预取
- 训练提速达90%线性扩展效率
Megatron-Turing NLG模型
微软和NVIDIA合作训练5300亿参数模型的关键并行技术:
- 3D并行架构
- 280个DGX A100节点
- ZeRO-1优化数据并行
- 2D张量-流水线分割策略
小结
并行化是训练超大规模语言模型的关键技术:
- 数据并行:复制整个模型,分割数据,适合中小型模型
- 模型并行:分割模型,包括张量并行和流水线并行
- 3D并行:结合多种并行策略,实现万亿参数级训练
- 工程实践:选择合适的并行策略组合,根据硬件和模型特点调整
下一节我们将介绍混合精度训练技术,这是另一种提升大模型训练效率的重要方法。