Minimind/train_pretrain.py

441 lines
21 KiB
Python
Raw Permalink Normal View History

2025-05-14 00:01:40 +08:00
import os
# 设置环境变量
os.environ["WANDB_MODE"] = "offline" # 或者使用 "dryrun"
import platform
import argparse
import time
import math
import warnings
import pandas as pd
import torch
import torch.distributed as dist
from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, DistributedSampler
# 移除通信分析工具导入
from contextlib import nullcontext
from typing import Optional
from transformers import AutoTokenizer
from model.model import MiniMindLM
from model.LMConfig import LMConfig
from model.dataset import PretrainDataset
warnings.filterwarnings('ignore')
def Logger(content):
# 如果没有使用ddp或者ddp的主设备那么就打印
if not ddp or dist.get_rank() == 0:
print(content)
def get_lr(current_step, total_steps, lr):
# 更新学习率
# \text{get\_lr}(c, t, l) = \frac{l}{10} + 0.5 \cdot l \cdot \left(1 + \cos\left(\frac{\pi \cdot c}{t}\right)\right)
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
def train_epoch(epoch, wandb):
loss_fct = nn.CrossEntropyLoss(reduction='none')
start_time = time.time()
# 在函数开始处定义moe_path避免在异常处理中引用未定义变量
moe_path = '_moe' if lm_config.use_moe else ''
# 添加CUDA事件来分析性能
if args.profile and (not ddp or dist.get_rank() == 0):
data_start = torch.cuda.Event(enable_timing=True)
data_end = torch.cuda.Event(enable_timing=True)
forward_start = torch.cuda.Event(enable_timing=True)
forward_end = torch.cuda.Event(enable_timing=True)
backward_start = torch.cuda.Event(enable_timing=True)
backward_end = torch.cuda.Event(enable_timing=True)
optimizer_start = torch.cuda.Event(enable_timing=True)
optimizer_end = torch.cuda.Event(enable_timing=True)
# 移除CUDA图优化代码
# 预取数据
prefetch_factor = 2 # 预取的批次数
data_iter = iter(train_loader)
prefetch_batches = []
# 预取初始批次
for _ in range(min(prefetch_factor, len(train_loader))):
try:
batch = next(data_iter)
prefetch_batches.append([t.to(args.device, non_blocking=True) for t in batch])
except StopIteration:
break
for step in range(len(train_loader)):
try:
# 计时数据加载
if args.profile and (not ddp or dist.get_rank() == 0):
data_start.record()
# 使用预取的数据
if prefetch_batches:
X, Y, loss_mask = prefetch_batches.pop(0)
else:
# 如果预取队列为空,直接加载
X, Y, loss_mask = [t.to(args.device) for t in next(data_iter)]
# 异步预取下一批数据
if step + prefetch_factor < len(train_loader):
try:
batch = next(data_iter)
prefetch_batches.append([t.to(args.device, non_blocking=True) for t in batch])
except StopIteration:
pass
if args.profile and (not ddp or dist.get_rank() == 0):
data_end.record()
# 更新学习率
lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
# 计时前向传播
if args.profile and (not ddp or dist.get_rank() == 0):
forward_start.record()
# 常规前向传播
with ctx:
res = model(X)
loss = loss_fct(
res.logits.view(-1, res.logits.size(-1)),
Y.view(-1)
).view(Y.size())
loss = (loss * loss_mask).sum() / loss_mask.sum()
# 添加辅助损失,如果存在的话
try:
if hasattr(model, 'module'):
# DDP情况
aux_loss = sum(l.feed_forward.aux_loss for l in model.module.layers
if hasattr(l.feed_forward, 'aux_loss'))
else:
# 非DDP情况
aux_loss = sum(l.feed_forward.aux_loss for l in model.layers
if hasattr(l.feed_forward, 'aux_loss'))
loss += aux_loss
except Exception as e:
Logger(f"Warning: Could not add auxiliary loss: {e}")
# 如果出错,不添加辅助损失
loss = loss / args.accumulation_steps
# 反向传播
scaler.scale(loss).backward()
if args.profile and (not ddp or dist.get_rank() == 0):
forward_end.record()
backward_start.record()
# Print data types for debugging
if step == 0 and (not ddp or dist.get_rank() == 0): # Print only for the first step of the first epoch on the main process
Logger("---- Data Type Check ----")
Logger(f"X.dtype: {X.dtype}")
if hasattr(model, 'module'): # DDP case
Logger(f"Model parameter dtype: {next(model.module.parameters()).dtype}")
else: # Non-DDP case
Logger(f"Model parameter dtype: {next(model.parameters()).dtype}")
Logger(f"res.logits.dtype: {res.logits.dtype}")
Logger(f"loss.dtype: {loss.dtype}")
Logger("-------------------------")
if args.profile and (not ddp or dist.get_rank() == 0):
backward_end.record()
# 在每一步都进行性能分析,而不仅仅是在梯度累积完成时
if (step + 1) % args.profile_interval == 0:
# 记录优化器时间(如果是梯度累积步骤)
if (step + 1) % args.accumulation_steps == 0:
optimizer_start.record()
# 优化器步骤
if (step + 1) % args.accumulation_steps == 0:
if args.profile and (not ddp or dist.get_rank() == 0):
if (step + 1) % args.profile_interval != 0:
optimizer_start.record()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
if args.profile and (not ddp or dist.get_rank() == 0):
optimizer_end.record()
# 性能分析输出每profile_interval步
if args.profile and (not ddp or dist.get_rank() == 0) and (step + 1) % args.profile_interval == 0:
# 同步CUDA事件以获取准确的计时
torch.cuda.synchronize()
# 计算各阶段耗时
data_time = data_start.elapsed_time(data_end)
forward_time = forward_start.elapsed_time(forward_end)
backward_time = backward_start.elapsed_time(backward_end)
# 只有在梯度累积步骤完成时才有优化器时间
if (step + 1) % args.accumulation_steps == 0:
optimizer_time = optimizer_start.elapsed_time(optimizer_end)
total_compute_time = forward_time + backward_time + optimizer_time
Logger(f"性能分析 - 步骤 {step+1}:")
Logger(f" 数据加载时间: {data_time:.2f} ms")
Logger(f" 前向传播时间: {forward_time:.2f} ms")
Logger(f" 反向传播时间: {backward_time:.2f} ms")
Logger(f" 优化器时间: {optimizer_time:.2f} ms")
Logger(f" 总计算时间: {total_compute_time:.2f} ms")
Logger(f" 计算/数据比例: {total_compute_time / data_time:.2f}")
else:
# 非梯度累积步骤,没有优化器时间
total_compute_time = forward_time + backward_time
Logger(f"性能分析 - 步骤 {step+1} (梯度累积中):")
Logger(f" 数据加载时间: {data_time:.2f} ms")
Logger(f" 前向传播时间: {forward_time:.2f} ms")
Logger(f" 反向传播时间: {backward_time:.2f} ms")
Logger(f" 总计算时间: {total_compute_time:.2f} ms")
Logger(f" 计算/数据比例: {total_compute_time / data_time:.2f}")
# 打印日志
if step % args.log_interval == 0:
spend_time = time.time() - start_time
Logger(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
args.epochs,
step,
iter_per_epoch,
loss.item() * args.accumulation_steps,
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
if (wandb is not None) and (not ddp or dist.get_rank() == 0):
log_dict = {
"loss": loss.item() * args.accumulation_steps,
"lr": optimizer.param_groups[-1]['lr'],
"epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60
}
# 如果启用了性能分析,也记录性能指标
if args.profile and (step + 1) % args.profile_interval == 0:
# 基本性能指标
perf_dict = {
"data_time_ms": data_time,
"forward_time_ms": forward_time,
"backward_time_ms": backward_time
}
# 只有在梯度累积步骤完成时才有优化器时间
if (step + 1) % args.accumulation_steps == 0:
total_compute_time = forward_time + backward_time + optimizer_time
perf_dict.update({
"optimizer_time_ms": optimizer_time,
"compute_time_ms": total_compute_time
})
else:
total_compute_time = forward_time + backward_time
perf_dict.update({
"compute_time_ms": total_compute_time
})
log_dict.update(perf_dict)
wandb.log(log_dict)
# 移除通信分析代码
# 保存模型
if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
model.eval()
# 使用函数开始处定义的moe_path变量
ckp = f'{args.save_dir}/pretrain_{lm_config.dim}{moe_path}.pth'
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
state_dict = model.module.state_dict() #获取模型参数
else:
state_dict = model.state_dict() #获取模型参数
torch.save(state_dict, ckp) #只保存参数
model.train()
except Exception as e:
print(f"Error occurred: {str(e)}")
save_path = f'{args.save_dir}/pretrain_{lm_config.dim}{moe_path}_nanERROR.pth'
if os.path.exists(save_path):
os.remove(save_path)
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
state_dict = model.module.state_dict()
else:
state_dict = model.state_dict()
torch.save(state_dict, save_path)
for name, param in model.named_parameters():
if param.grad is not None and torch.isnan(param.grad).any():
print(f"NaN gradient in parameter: {name}")
for name, param in model.named_parameters():
if param.grad is not None and torch.isnan(param.grad).any():
print(f"Parameter {name} values: {param.data}")
print(f"Parameter {name} gradients: {param.grad}")
raise ValueError("NaN gradient detected")
def init_model(lm_config, pretrained_embedding_path: Optional[str] = None):
# 加载tokenizer
2025-05-16 08:38:59 +00:00
tokenizer = AutoTokenizer.from_pretrained('/mnt/lzn/Minimind/Minimind/model/minimind_tokenizer')
2025-05-14 00:01:40 +08:00
# 加载模型
model = MiniMindLM(lm_config).to(args.device)
# Load pretrained token embeddings if path is provided
if pretrained_embedding_path and os.path.exists(pretrained_embedding_path):
Logger(f"Loading pretrained token embeddings from {pretrained_embedding_path}")
embedding_weights = torch.load(pretrained_embedding_path, map_location=args.device)
model.tok_embeddings.load_state_dict(embedding_weights)
Logger("Successfully loaded pretrained token embeddings.")
elif pretrained_embedding_path:
Logger(f"Warning: Pretrained embedding path {pretrained_embedding_path} provided but file does not exist. Initializing embeddings from scratch.")
# 打印模型参数
Logger(f'LLM总参数量{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
return model, tokenizer
# 移除通信分析函数
def init_distributed_mode():
if not ddp: return #如果没有启用分布式数据并行(DDP),直接返回,不执行任何操作。
global ddp_local_rank, DEVICE #声明这两个变量为全局变量,以便在函数外部也能访问它们。
dist.init_process_group(backend="nccl") #初始化分布式进程组使用NCCL后端NVIDIA Collective Communications Library这是NVIDIA GPU之间通信的优化库。
ddp_rank = int(os.environ["RANK"]) #从环境变量获取当前进程的全局编号。
ddp_local_rank = int(os.environ["LOCAL_RANK"]) #从环境变量获取当前进程的本地编号。
ddp_world_size = int(os.environ["WORLD_SIZE"]) #从环境变量获取当前进程组中的进程总数。
DEVICE = f"cuda:{ddp_local_rank}" #根据本地编号选择GPU设备。
torch.cuda.set_device(DEVICE) #设置当前进程的GPU设备。
# torchrun --nproc_per_node 2 1-pretrain.py
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MiniMind Pretraining")
parser.add_argument("--out_dir", type=str, default="out")
# 若要以最快速度实现zero则epochs设置为1轮否则应当利用有限的数据训练2~6个epochs。
parser.add_argument("--epochs", type=int, default=3)
parser.add_argument("--batch_size", type=int, default=24)
parser.add_argument("--learning_rate", type=float, default=2e-4)
parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu") #如果GPU可用则使用GPU否则使用CPU。
parser.add_argument("--dtype", type=str, default="bfloat16")
parser.add_argument("--use_wandb", default=True, action="store_true")
parser.add_argument("--wandb_project", type=str, default="MiniMind-Pretrain")
parser.add_argument("--num_workers", type=int, default=48)
parser.add_argument("--ddp", action="store_true")
parser.add_argument("--accumulation_steps", type=int, default=32) #梯度累积步数,用于控制梯度更新频率。
parser.add_argument("--grad_clip", type=float, default=1.0) #梯度裁剪阈值,用于防止梯度爆炸。
parser.add_argument("--warmup_iters", type=int, default=0) #预热迭代次数,用于控制学习率预热过程。
parser.add_argument("--log_interval", type=int, default=100) #日志打印间隔,用于控制日志打印的频率。
parser.add_argument("--save_interval", type=int, default=10000) #模型保存间隔,用于控制模型保存的频率。
parser.add_argument('--local_rank', type=int, default=-1) #本地进程编号,用于分布式训练。
parser.add_argument('--dim', default=1024, type=int) #模型维度,用于控制模型的大小。
parser.add_argument('--n_layers', default=32, type=int) #层数,用于控制模型层数。
parser.add_argument('--max_seq_len', default=1024, type=int) #最大序列长度,用于控制输入序列的最大长度。
parser.add_argument('--use_moe', default=False, type=bool) #是否使用MOE用于控制是否使用MOE。
parser.add_argument('--disable_db', action='store_true', help="禁用数据库功能使用固定值1e-4替代") #禁用数据库功能,启用特殊模式
2025-05-16 08:38:59 +00:00
parser.add_argument("--data_path", type=str, default="/mnt/lzn/Minimind/dataset/dir/pretrain_hq.jsonl") #数据路径,用于控制数据集的路径。
2025-05-14 00:01:40 +08:00
parser.add_argument("--pretrained_embedding_path", type=str, default=None, help="Path to pretrained token embedding weights (.pth file)")
# 性能分析相关参数
parser.add_argument("--profile", action="store_true", default=True, help="启用性能分析")
parser.add_argument("--profile_interval", type=int, default=10, help="性能分析打印间隔(步数)")
parser.add_argument("--use_flash_attn", action="store_true", default=True, help="启用FlashAttention")
args = parser.parse_args()
print(args)
lm_config = LMConfig(
dim=args.dim,
n_layers=args.n_layers,
max_seq_len=args.max_seq_len,
use_moe=args.use_moe,
disable_db=args.disable_db, # 添加禁用数据库参数
flash_attn=args.use_flash_attn # 添加FlashAttention支持
) #创建LMConfig对象用于控制模型配置。
args.save_dir = os.path.join(args.out_dir) #创建保存目录。
os.makedirs(args.save_dir, exist_ok=True) #创建保存目录。
os.makedirs(args.out_dir, exist_ok=True) #创建输出目录。
tokens_per_iter = args.batch_size * lm_config.max_seq_len #计算每个迭代步骤的token数量。
print(f"tokens_per_iter: {tokens_per_iter}")
device_type = "cuda" if "cuda" in args.device else "cpu" #确定设备类型。
# Determine the torch dtype
pt_dtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[args.dtype]
args.wandb_run_name = f"MiniMind-Pretrain-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast(dtype=pt_dtype)
ddp = int(os.environ.get("RANK", -1)) != -1 # is this a ddp run?
ddp_local_rank, DEVICE = 0, "cuda:0"
base_seed = 1337
torch.manual_seed(base_seed)
torch.cuda.manual_seed(base_seed)
if ddp:
init_distributed_mode()
args.device = torch.device(DEVICE)
rank = dist.get_rank()
torch.manual_seed(base_seed + rank)
# 同时设置 CUDA 的随机种子
torch.cuda.manual_seed(base_seed + rank)
if args.use_wandb and (not ddp or ddp_local_rank == 0):
import wandb
# Merge args and lm_config parameters for wandb config
config = vars(args).copy()
config.update(lm_config.__dict__)
wandb.init(project=args.wandb_project, name=args.wandb_run_name, config=config)
else:
wandb = None
model, tokenizer = init_model(lm_config, args.pretrained_embedding_path)
train_ds = PretrainDataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len)
train_sampler = DistributedSampler(train_ds) if ddp else None
# 优化DataLoader配置
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{ddp_local_rank}" if ddp else "cuda:0", # 指定pin_memory设备
drop_last=False,
shuffle=False,
num_workers=args.num_workers,
sampler=train_sampler,
persistent_workers=True if args.num_workers > 0 else False, # 保持worker进程活跃
prefetch_factor=2 if args.num_workers > 0 else None # 预取因子
)
# 只有在使用float16时才启用GradScalerbfloat16不需要
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype == 'float16'))
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)
if ddp:
model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
# 保留find_unused_parameters=True参数因为模型中确实有未使用的参数
model = DistributedDataParallel(model, device_ids=[ddp_local_rank], find_unused_parameters=True)
# 暂时保留set_detect_anomaly以便调试
# 训练稳定后可以注释掉这行来提高速度
torch.autograd.set_detect_anomaly(True)
iter_per_epoch = len(train_loader)
for epoch in range(args.epochs):
train_epoch(epoch, wandb)