From 7cf4228401d8b2dea0284cbfa06cec9bf7943eb5 Mon Sep 17 00:00:00 2001 From: Jax922 <1322037892@qq.com> Date: Tue, 13 May 2025 08:40:43 +0800 Subject: [PATCH] update --- train_pretrain_accelerate.py | 92 ++++++++++++++++++++++++++---------- 1 file changed, 68 insertions(+), 24 deletions(-) diff --git a/train_pretrain_accelerate.py b/train_pretrain_accelerate.py index e791ca1..e72f46f 100644 --- a/train_pretrain_accelerate.py +++ b/train_pretrain_accelerate.py @@ -12,6 +12,7 @@ from torch import optim, nn from torch.utils.data import DataLoader from contextlib import nullcontext from typing import Optional +import datetime # Add datetime for time formatting from accelerate import Accelerator from accelerate.utils import set_seed from accelerate.utils import DeepSpeedPlugin @@ -30,6 +31,10 @@ def Logger(msg, accelerator=None): if accelerator is None or accelerator.is_main_process: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}") +# Helper function to format seconds into HH:MM:SS +def format_time(seconds): + return str(datetime.timedelta(seconds=int(seconds))) + # 获取学习率函数 def get_lr(it, num_iters, learning_rate): # 余弦学习率衰减 @@ -50,13 +55,14 @@ def init_model(lm_config, pretrained_embedding_path=None): Logger(f'LLM总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万') return model, tokenizer -def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx): +def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time): loss_fct = nn.CrossEntropyLoss(reduction='none') - start_time = time.time() - # 在函数开始处定义moe_path,避免在异常处理中引用未定义变量 + epoch_start_time = time.time() + total_steps_in_epoch = len(train_loader) + total_training_steps = args.epochs * total_steps_in_epoch moe_path = '_moe' if args.use_moe else '' - # 添加CUDA事件来分析性能 + # 添加CUDA事件来分析性能 (只在主进程进行) if args.profile and accelerator.is_main_process: data_start = torch.cuda.Event(enable_timing=True) data_end = torch.cuda.Event(enable_timing=True) @@ -80,9 +86,12 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a except StopIteration: break - for step in range(len(train_loader)): + # 在开始循环前初始化日志记录所需变量 + last_log_time = epoch_start_time + + for step in range(total_steps_in_epoch): try: - # 计时数据加载 + # 计时数据加载 (只在主进程进行) if args.profile and accelerator.is_main_process: data_start.record() @@ -101,6 +110,7 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a except StopIteration: pass + # 计时数据加载结束 (只在主进程进行) if args.profile and accelerator.is_main_process: data_end.record() @@ -108,7 +118,7 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a if scheduler is not None: scheduler.step() - # 计时前向传播 + # 计时前向传播 (只在主进程进行) if args.profile and accelerator.is_main_process: forward_start.record() @@ -130,10 +140,11 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a # 如果出错,不添加辅助损失 loss = loss / args.accumulation_steps + # 计时前向传播结束 (只在主进程进行) if args.profile and accelerator.is_main_process: forward_end.record() - # 计时反向传播 + # 计时反向传播 (只在主进程进行) if args.profile and accelerator.is_main_process: backward_start.record() @@ -141,10 +152,11 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a # 当使用DeepSpeed时,它会自动处理梯度累积和梯度裁剪 accelerator.backward(loss) + # 计时反向传播结束 (只在主进程进行) if args.profile and accelerator.is_main_process: backward_end.record() - # 计时优化器步骤 + # 计时优化器步骤 (只在主进程进行) if args.profile and accelerator.is_main_process: optimizer_start.record() @@ -157,40 +169,71 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a # 但为了安全起见,我们仍然显式调用它 optimizer.zero_grad() + # 计时优化器步骤结束 (只在主进程进行) if args.profile and accelerator.is_main_process: optimizer_end.record() - # 打印训练信息 + # 打印训练信息 (只在主进程进行) if (step + 1) % args.log_interval == 0 and accelerator.is_main_process: + current_time = time.time() # 计算性能指标 if args.profile: torch.cuda.synchronize() - data_time = data_start.elapsed_time(data_end) if step > 0 else 0 + # 使用自上次日志以来的时间计算性能指标,而不是总时间 + data_time = data_start.elapsed_time(data_end) forward_time = forward_start.elapsed_time(forward_end) backward_time = backward_start.elapsed_time(backward_end) - optimizer_time = optimizer_start.elapsed_time(optimizer_end) if (step + 1) % args.accumulation_steps == 0 else 0 - total_time = data_time + forward_time + backward_time + optimizer_time + optimizer_time = optimizer_start.elapsed_time(optimizer_end) + iter_time = (current_time - last_log_time) * 1000 / args.log_interval # avg ms per iteration since last log + # total_time_ms = data_time + forward_time + backward_time + optimizer_time # 打印性能分析 if (step + 1) % (args.log_interval * args.profile_interval) == 0: - Logger(f"性能分析 - 数据加载: {data_time:.2f}ms ({data_time/total_time*100:.1f}%), " - f"前向传播: {forward_time:.2f}ms ({forward_time/total_time*100:.1f}%), " - f"反向传播: {backward_time:.2f}ms ({backward_time/total_time*100:.1f}%), " - f"优化器: {optimizer_time:.2f}ms ({optimizer_time/total_time*100:.1f}%)", accelerator) + Logger(f"性能分析 (Avg/iter over last {args.log_interval} steps) - " + f"Data: {data_time/args.log_interval:.2f}ms, " + f"Fwd: {forward_time/args.log_interval:.2f}ms, " + f"Bwd: {backward_time/args.log_interval:.2f}ms, " + f"Optim: {optimizer_time/args.log_interval:.2f}ms, " + f"Iter Time: {iter_time:.2f}ms", accelerator) + # 重置事件以便下次测量从0开始 + data_start = torch.cuda.Event(enable_timing=True) + data_end = torch.cuda.Event(enable_timing=True) + forward_start = torch.cuda.Event(enable_timing=True) + forward_end = torch.cuda.Event(enable_timing=True) + backward_start = torch.cuda.Event(enable_timing=True) + backward_end = torch.cuda.Event(enable_timing=True) + optimizer_start = torch.cuda.Event(enable_timing=True) + optimizer_end = torch.cuda.Event(enable_timing=True) + # 计算当前学习率 current_lr = optimizer.param_groups[0]['lr'] - # 计算训练速度 - elapsed_time = time.time() - start_time - tokens_per_sec = (step + 1) * args.batch_size * args.max_seq_len / elapsed_time + # 计算时间 + epoch_elapsed_time = current_time - epoch_start_time + epoch_steps_done = step + 1 + epoch_avg_step_time = epoch_elapsed_time / epoch_steps_done + epoch_remaining_time = epoch_avg_step_time * (total_steps_in_epoch - epoch_steps_done) - Logger(f"Epoch {epoch+1}/{args.epochs}, Step {step+1}/{len(train_loader)}, " + total_elapsed_time = current_time - overall_start_time + total_steps_done = epoch * total_steps_in_epoch + epoch_steps_done + total_avg_step_time = total_elapsed_time / total_steps_done if total_steps_done > 0 else 0 + total_remaining_time = total_avg_step_time * (total_training_steps - total_steps_done) if total_steps_done > 0 else 0 + + # 计算训练速度 (基于最近的log_interval) + interval_elapsed_time = current_time - last_log_time + tokens_processed_interval = args.log_interval * args.batch_size * args.max_seq_len + tokens_per_sec = tokens_processed_interval / interval_elapsed_time if interval_elapsed_time > 0 else 0 + last_log_time = current_time # 更新上次日志时间 + + Logger(f"Epoch {epoch+1}/{args.epochs}, Step {step+1}/{total_steps_in_epoch}, " f"Loss: {loss.item()*args.accumulation_steps:.4f}, " f"LR: {current_lr:.6f}, " - f"Speed: {tokens_per_sec:.2f} tokens/sec", accelerator) + f"Speed: {tokens_per_sec:.2f} tokens/sec | " + f"Epoch Time Left: {format_time(epoch_remaining_time)} | " + f"Total Time Left: {format_time(total_remaining_time)}", accelerator) - # 保存模型 + # 保存模型 (只在主进程进行) if (step + 1) % args.save_interval == 0 and accelerator.is_main_process: # 使用函数开始处定义的moe_path变量 ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth' @@ -335,8 +378,9 @@ def main(): wandb = None # 训练循环 + overall_start_time = time.time() # Record overall start time for epoch in range(args.epochs): - train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx) + train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time) # Pass overall start time # 关闭wandb if args.use_wandb and accelerator.is_main_process: