This commit is contained in:
Jax922 2025-05-13 08:40:43 +08:00
parent caa9c23bc5
commit 7cf4228401

View File

@ -12,6 +12,7 @@ from torch import optim, nn
from torch.utils.data import DataLoader from torch.utils.data import DataLoader
from contextlib import nullcontext from contextlib import nullcontext
from typing import Optional from typing import Optional
import datetime # Add datetime for time formatting
from accelerate import Accelerator from accelerate import Accelerator
from accelerate.utils import set_seed from accelerate.utils import set_seed
from accelerate.utils import DeepSpeedPlugin from accelerate.utils import DeepSpeedPlugin
@ -30,6 +31,10 @@ def Logger(msg, accelerator=None):
if accelerator is None or accelerator.is_main_process: if accelerator is None or accelerator.is_main_process:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}") print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}")
# Helper function to format seconds into HH:MM:SS
def format_time(seconds):
return str(datetime.timedelta(seconds=int(seconds)))
# 获取学习率函数 # 获取学习率函数
def get_lr(it, num_iters, learning_rate): def get_lr(it, num_iters, learning_rate):
# 余弦学习率衰减 # 余弦学习率衰减
@ -50,13 +55,14 @@ def init_model(lm_config, pretrained_embedding_path=None):
Logger(f'LLM总参数量{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万') Logger(f'LLM总参数量{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
return model, tokenizer return model, tokenizer
def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx): def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time):
loss_fct = nn.CrossEntropyLoss(reduction='none') loss_fct = nn.CrossEntropyLoss(reduction='none')
start_time = time.time() epoch_start_time = time.time()
# 在函数开始处定义moe_path避免在异常处理中引用未定义变量 total_steps_in_epoch = len(train_loader)
total_training_steps = args.epochs * total_steps_in_epoch
moe_path = '_moe' if args.use_moe else '' moe_path = '_moe' if args.use_moe else ''
# 添加CUDA事件来分析性能 # 添加CUDA事件来分析性能 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
data_start = torch.cuda.Event(enable_timing=True) data_start = torch.cuda.Event(enable_timing=True)
data_end = torch.cuda.Event(enable_timing=True) data_end = torch.cuda.Event(enable_timing=True)
@ -80,9 +86,12 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
except StopIteration: except StopIteration:
break break
for step in range(len(train_loader)): # 在开始循环前初始化日志记录所需变量
last_log_time = epoch_start_time
for step in range(total_steps_in_epoch):
try: try:
# 计时数据加载 # 计时数据加载 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
data_start.record() data_start.record()
@ -101,6 +110,7 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
except StopIteration: except StopIteration:
pass pass
# 计时数据加载结束 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
data_end.record() data_end.record()
@ -108,7 +118,7 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
if scheduler is not None: if scheduler is not None:
scheduler.step() scheduler.step()
# 计时前向传播 # 计时前向传播 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
forward_start.record() forward_start.record()
@ -130,10 +140,11 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
# 如果出错,不添加辅助损失 # 如果出错,不添加辅助损失
loss = loss / args.accumulation_steps loss = loss / args.accumulation_steps
# 计时前向传播结束 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
forward_end.record() forward_end.record()
# 计时反向传播 # 计时反向传播 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
backward_start.record() backward_start.record()
@ -141,10 +152,11 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
# 当使用DeepSpeed时它会自动处理梯度累积和梯度裁剪 # 当使用DeepSpeed时它会自动处理梯度累积和梯度裁剪
accelerator.backward(loss) accelerator.backward(loss)
# 计时反向传播结束 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
backward_end.record() backward_end.record()
# 计时优化器步骤 # 计时优化器步骤 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
optimizer_start.record() optimizer_start.record()
@ -157,40 +169,71 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
# 但为了安全起见,我们仍然显式调用它 # 但为了安全起见,我们仍然显式调用它
optimizer.zero_grad() optimizer.zero_grad()
# 计时优化器步骤结束 (只在主进程进行)
if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
optimizer_end.record() optimizer_end.record()
# 打印训练信息 # 打印训练信息 (只在主进程进行)
if (step + 1) % args.log_interval == 0 and accelerator.is_main_process: if (step + 1) % args.log_interval == 0 and accelerator.is_main_process:
current_time = time.time()
# 计算性能指标 # 计算性能指标
if args.profile: if args.profile:
torch.cuda.synchronize() torch.cuda.synchronize()
data_time = data_start.elapsed_time(data_end) if step > 0 else 0 # 使用自上次日志以来的时间计算性能指标,而不是总时间
data_time = data_start.elapsed_time(data_end)
forward_time = forward_start.elapsed_time(forward_end) forward_time = forward_start.elapsed_time(forward_end)
backward_time = backward_start.elapsed_time(backward_end) backward_time = backward_start.elapsed_time(backward_end)
optimizer_time = optimizer_start.elapsed_time(optimizer_end) if (step + 1) % args.accumulation_steps == 0 else 0 optimizer_time = optimizer_start.elapsed_time(optimizer_end)
total_time = data_time + forward_time + backward_time + optimizer_time iter_time = (current_time - last_log_time) * 1000 / args.log_interval # avg ms per iteration since last log
# total_time_ms = data_time + forward_time + backward_time + optimizer_time
# 打印性能分析 # 打印性能分析
if (step + 1) % (args.log_interval * args.profile_interval) == 0: if (step + 1) % (args.log_interval * args.profile_interval) == 0:
Logger(f"性能分析 - 数据加载: {data_time:.2f}ms ({data_time/total_time*100:.1f}%), " Logger(f"性能分析 (Avg/iter over last {args.log_interval} steps) - "
f"前向传播: {forward_time:.2f}ms ({forward_time/total_time*100:.1f}%), " f"Data: {data_time/args.log_interval:.2f}ms, "
f"反向传播: {backward_time:.2f}ms ({backward_time/total_time*100:.1f}%), " f"Fwd: {forward_time/args.log_interval:.2f}ms, "
f"优化器: {optimizer_time:.2f}ms ({optimizer_time/total_time*100:.1f}%)", accelerator) f"Bwd: {backward_time/args.log_interval:.2f}ms, "
f"Optim: {optimizer_time/args.log_interval:.2f}ms, "
f"Iter Time: {iter_time:.2f}ms", accelerator)
# 重置事件以便下次测量从0开始
data_start = torch.cuda.Event(enable_timing=True)
data_end = torch.cuda.Event(enable_timing=True)
forward_start = torch.cuda.Event(enable_timing=True)
forward_end = torch.cuda.Event(enable_timing=True)
backward_start = torch.cuda.Event(enable_timing=True)
backward_end = torch.cuda.Event(enable_timing=True)
optimizer_start = torch.cuda.Event(enable_timing=True)
optimizer_end = torch.cuda.Event(enable_timing=True)
# 计算当前学习率 # 计算当前学习率
current_lr = optimizer.param_groups[0]['lr'] current_lr = optimizer.param_groups[0]['lr']
# 计算训练速度 # 计算时间
elapsed_time = time.time() - start_time epoch_elapsed_time = current_time - epoch_start_time
tokens_per_sec = (step + 1) * args.batch_size * args.max_seq_len / elapsed_time epoch_steps_done = step + 1
epoch_avg_step_time = epoch_elapsed_time / epoch_steps_done
epoch_remaining_time = epoch_avg_step_time * (total_steps_in_epoch - epoch_steps_done)
Logger(f"Epoch {epoch+1}/{args.epochs}, Step {step+1}/{len(train_loader)}, " total_elapsed_time = current_time - overall_start_time
total_steps_done = epoch * total_steps_in_epoch + epoch_steps_done
total_avg_step_time = total_elapsed_time / total_steps_done if total_steps_done > 0 else 0
total_remaining_time = total_avg_step_time * (total_training_steps - total_steps_done) if total_steps_done > 0 else 0
# 计算训练速度 (基于最近的log_interval)
interval_elapsed_time = current_time - last_log_time
tokens_processed_interval = args.log_interval * args.batch_size * args.max_seq_len
tokens_per_sec = tokens_processed_interval / interval_elapsed_time if interval_elapsed_time > 0 else 0
last_log_time = current_time # 更新上次日志时间
Logger(f"Epoch {epoch+1}/{args.epochs}, Step {step+1}/{total_steps_in_epoch}, "
f"Loss: {loss.item()*args.accumulation_steps:.4f}, " f"Loss: {loss.item()*args.accumulation_steps:.4f}, "
f"LR: {current_lr:.6f}, " f"LR: {current_lr:.6f}, "
f"Speed: {tokens_per_sec:.2f} tokens/sec", accelerator) f"Speed: {tokens_per_sec:.2f} tokens/sec | "
f"Epoch Time Left: {format_time(epoch_remaining_time)} | "
f"Total Time Left: {format_time(total_remaining_time)}", accelerator)
# 保存模型 # 保存模型 (只在主进程进行)
if (step + 1) % args.save_interval == 0 and accelerator.is_main_process: if (step + 1) % args.save_interval == 0 and accelerator.is_main_process:
# 使用函数开始处定义的moe_path变量 # 使用函数开始处定义的moe_path变量
ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth' ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth'
@ -335,8 +378,9 @@ def main():
wandb = None wandb = None
# 训练循环 # 训练循环
overall_start_time = time.time() # Record overall start time
for epoch in range(args.epochs): for epoch in range(args.epochs):
train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx) train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time) # Pass overall start time
# 关闭wandb # 关闭wandb
if args.use_wandb and accelerator.is_main_process: if args.use_wandb and accelerator.is_main_process: