数据调度

预计学习时间:25分钟

数据调度是指对训练数据的组织、分配和排序策略,直接影响模型的训练效率和最终性能。

数据调度的重要性

合理的数据调度策略能够显著提升模型训练效果:

  • 加速收敛:优化的数据顺序可以帮助模型更快达到收敛
  • 提高效率:减少计算资源浪费,降低训练成本
  • 增强泛化:合理的数据分布有助于提高模型泛化能力
  • 避免灾难性遗忘:特别对于持续学习场景,可防止模型忘记先前学习的知识

数据划分策略

训练-验证-测试集划分

科学的数据集划分是避免过拟合的关键:

from sklearn.model_selection import train_test_split

# 基本划分
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42)
train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=42)

# 分层划分(保持标签分布)
train_data, test_data = train_test_split(
    dataset, 
    test_size=0.2, 
    stratify=dataset['label'],
    random_state=42
)

划分策略应避免数据泄露,确保测试集能真实反映模型在未见数据上的表现。

常见划分方法

划分方法适用场景特点
随机划分独立同分布数据简单易实现,适用于大部分场景
分层划分标签分布不均衡保持各集合中标签比例一致
时间序列划分时序数据按时间顺序划分,避免"未来泄露"
用户划分用户相关数据按用户ID划分,防止用户信息泄露
K折交叉验证数据量有限充分利用有限数据,但计算成本高

数据混合策略

不同类型数据的混合比例会影响模型能力偏向:

# 示例:混合多个数据源
mixed_dataset = {
    'instruction_data': instruction_dataset,  # 指令数据
    'conversation_data': conversation_dataset,  # 对话数据
    'knowledge_data': knowledge_dataset,  # 知识类数据
}

# 设置混合比例
mixing_rates = {
    'instruction_data': 0.4,  # 40%
    'conversation_data': 0.3,  # 30%
    'knowledge_data': 0.3,  # 30%
}

# 根据比例采样
mixed_batches = []
for data_type, dataset in mixed_dataset.items():
    sample_size = int(batch_size * mixing_rates[data_type])
    sampled_data = random.sample(dataset, sample_size)
    mixed_batches.extend(sampled_data)

random.shuffle(mixed_batches)  # 打乱混合后的数据

批处理策略

批大小选择

批大小(Batch Size)是影响模型训练的关键参数:

  • 大批量:训练稳定,梯度估计更准确,但内存消耗大
  • 小批量:内存友好,更好的正则化效果,但训练不稳定
  • 渐进式批量:从小批量开始,逐渐增加,结合两者优势
from torch.utils.data import DataLoader

# 设置批处理
train_loader = DataLoader(
    train_dataset,
    batch_size=64,  # 批大小
    shuffle=True,   # 随机打乱
    num_workers=4   # 并行加载进程数
)

梯度累积

对于特大模型或显存有限的情况,梯度累积是有效的解决方案:

# 梯度累积示例
accumulation_steps = 4  # 累积4个小批次
for i, batch in enumerate(train_loader):
    # 前向传播
    outputs = model(batch)
    loss = loss_fn(outputs, batch["labels"])
    
    # 缩放损失
    loss = loss / accumulation_steps
    
    # 反向传播
    loss.backward()
    
    # 累积指定步数后更新参数
    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

高级调度技术

课程学习

课程学习是一种从易到难安排训练样本的策略:

# 实现课程学习的数据加载器
class CurriculumSampler:
    def __init__(self, dataset, difficulty_fn, num_epochs=3):
        self.dataset = dataset
        self.difficulty_scores = [difficulty_fn(data) for data in dataset]
        self.num_epochs = num_epochs
        self.current_epoch = 0
        
    def get_batch(self, batch_size):
        # 根据当前epoch计算难度阈值
        max_difficulty = max(self.difficulty_scores)
        current_threshold = (self.current_epoch + 1) * max_difficulty / self.num_epochs
        
        # 筛选符合当前难度的样本
        eligible_indices = [
            i for i, score in enumerate(self.difficulty_scores) 
            if score <= current_threshold
        ]
        
        # 随机采样批次
        batch_indices = random.sample(eligible_indices, min(batch_size, len(eligible_indices)))
        return [self.dataset[i] for i in batch_indices]
    
    def next_epoch(self):
        self.current_epoch = min(self.current_epoch + 1, self.num_epochs - 1)

重要性采样

根据样本重要性调整采样概率,关注更有价值的样本:

import numpy as np

# 根据损失值计算样本权重
sample_losses = [compute_loss(model, sample) for sample in dataset]
sample_weights = np.exp(sample_losses) / sum(np.exp(sample_losses))

# 带权重的采样器
weighted_sampler = torch.utils.data.WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)

# 使用带权重的采样器
train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    sampler=weighted_sampler
)

梯度退火

随着训练进行调整学习率,帮助模型更好地收敛:

from torch.optim.lr_scheduler import CosineAnnealingLR

# 余弦退火学习率调度器
scheduler = CosineAnnealingLR(
    optimizer,
    T_max=10,  # 周期
    eta_min=1e-6  # 最小学习率
)

# 训练循环
for epoch in range(num_epochs):
    for batch in train_loader:
        # ... 训练步骤 ...
        optimizer.step()
    
    # 更新学习率
    scheduler.step()

分布式训练数据调度

大模型训练通常需要分布式环境,数据调度需要特别考虑:

# PyTorch分布式数据并行示例
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler

# 初始化分布式环境
dist.init_process_group(backend="nccl")
local_rank = dist.get_rank()
world_size = dist.get_world_size()

# 分布式采样器
train_sampler = DistributedSampler(
    train_dataset,
    num_replicas=world_size,
    rank=local_rank
)

# 分布式数据加载器
train_loader = DataLoader(
    train_dataset,
    batch_size=per_device_batch_size,
    sampler=train_sampler,
    num_workers=4
)

数据调度最佳实践

  1. 监控数据分布:定期检查各批次数据的分布情况
  2. 动态调整策略:根据训练进展调整数据混合比例和批大小
  3. 避免偏见放大:确保少数样本类型也能得到充分学习
  4. 考虑计算资源:数据调度应充分利用但不超出可用计算资源
  5. 记录与复现:完整记录数据调度参数,确保实验可复现

下一节将介绍分词处理,学习如何将文本转换为大语言模型能处理的token序列。