From caa9c23bc55d2a981693a6ee9e3cb76160ab75b8 Mon Sep 17 00:00:00 2001 From: Jax922 <1322037892@qq.com> Date: Mon, 12 May 2025 13:11:39 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8accelerate=E5=92=8Cdeepseed?= =?UTF-8?q?=E6=9B=BF=E4=BB=A3torchrun?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README_accelerate.md | 126 +++++++++++++ accelerate_config.yaml | 17 ++ ds_config.json | 49 +++++ run_accelerate.sh | 48 +++++ train_pretrain_accelerate.py | 346 +++++++++++++++++++++++++++++++++++ 5 files changed, 586 insertions(+) create mode 100644 README_accelerate.md create mode 100644 accelerate_config.yaml create mode 100644 ds_config.json create mode 100644 run_accelerate.sh create mode 100644 train_pretrain_accelerate.py diff --git a/README_accelerate.md b/README_accelerate.md new file mode 100644 index 0000000..e8ee065 --- /dev/null +++ b/README_accelerate.md @@ -0,0 +1,126 @@ +# 使用Accelerate+DeepSpeed进行分布式训练 + +本文档介绍如何使用Accelerate和DeepSpeed进行MiniMind模型的分布式训练。 + +## 环境准备 + +首先,确保安装了必要的依赖: + +```bash +pip install accelerate deepspeed +``` + +## 配置文件说明 + +### 1. DeepSpeed配置文件 (ds_config.json) + +DeepSpeed配置文件定义了优化器、学习率调度器和ZeRO优化等参数。主要配置包括: + +- **ZeRO优化**:使用ZeRO-2进行优化,可以减少GPU内存使用 +- **优化器设置**:使用AdamW优化器 +- **混合精度训练**:支持FP16和BF16 +- **梯度累积**:通过"auto"自动设置,与训练脚本参数保持一致 + +### 2. Accelerate配置文件 (accelerate_config.yaml) + +Accelerate配置文件定义了分布式训练的基本设置,包括: + +- **分布式类型**:使用DeepSpeed +- **混合精度**:使用BF16 +- **进程数量**:设置为4(可根据GPU数量调整) +- **DeepSpeed配置**:指向ds_config.json文件 + +## 训练脚本说明 + +新的训练脚本`train_pretrain_accelerate.py`基于原有的`train_pretrain.py`修改而来,主要变化包括: + +1. 使用Accelerator替代了PyTorch原生的分布式功能 +2. 移除了torchrun相关的分布式初始化代码 +3. 使用Accelerator的API进行模型、优化器和数据加载器的准备 +4. 使用Accelerator的API进行反向传播和梯度裁剪 +5. 处理了位置编码和未使用参数的问题 + +## 启动训练 + +有两种方式启动训练: + +### 方法1:使用预先配置的accelerate配置文件 + +```bash +accelerate launch --config_file accelerate_config.yaml train_pretrain_accelerate.py \ + --epochs 3 \ + --batch_size 24 \ + --learning_rate 2e-4 \ + --dtype bfloat16 \ + --accumulation_steps 32 \ + --grad_clip 1.0 \ + --log_interval 100 \ + --save_interval 10000 \ + --dim 1024 \ + --n_layers 32 \ + --max_seq_len 1024 \ + --use_flash_attn \ + --profile \ + --profile_interval 10 +``` + +### 方法2:使用命令行参数直接配置accelerate + +```bash +CUDA_VISIBLE_DEVICES=0,1,2,3 accelerate launch \ + --multi_gpu \ + --num_processes=4 \ + --mixed_precision=bf16 \ + --main_process_port=29500 \ + --deepspeed_config_file ds_config.json \ + train_pretrain_accelerate.py \ + --epochs 3 \ + --batch_size 24 \ + --learning_rate 2e-4 \ + --dtype bfloat16 \ + --accumulation_steps 32 \ + --grad_clip 1.0 \ + --log_interval 100 \ + --save_interval 10000 \ + --dim 1024 \ + --n_layers 32 \ + --max_seq_len 1024 \ + --use_flash_attn \ + --profile \ + --profile_interval 10 +``` + +也可以直接使用提供的脚本: + +```bash +bash run_accelerate.sh +``` + +## Accelerate与DeepSpeed配置的关系 + +1. **Accelerate**是一个高级API,用于简化分布式训练的设置和启动,它可以与多种分布式训练后端(如DeepSpeed、FSDP等)一起使用。 + +2. **DeepSpeed**是一个优化库,专注于大规模模型训练的内存优化和性能提升,提供了ZeRO优化等功能。 + +3. **配置关系**: + - Accelerate配置文件(YAML)定义了使用哪种分布式后端以及基本的分布式设置 + - DeepSpeed配置文件(JSON)定义了DeepSpeed特有的优化参数 + - Accelerate通过`deepspeed_config_file`参数引用DeepSpeed配置文件 + +## 注意事项 + +1. **位置编码处理**: + - 在模型中,`pos_cis`是一个复数张量,在分布式训练中需要特别处理 + - 在新的训练脚本中,我们使用Accelerator的API来处理这个问题,不再需要`_ddp_params_and_buffers_to_ignore` + +2. **未使用参数处理**: + - 原代码中使用`find_unused_parameters=True`来处理未使用的参数 + - 在新的训练脚本中,我们直接使用Accelerator的API,它会自动处理这个问题 + +3. **混合精度训练**: + - DeepSpeed配置文件中的`fp16`和`bf16`设置为`"auto"` + - 实际使用的精度由Accelerate的`--mixed_precision`参数决定 + +4. **梯度累积**: + - DeepSpeed配置文件中的`gradient_accumulation_steps`设置为`"auto"` + - 实际的梯度累积步数由训练脚本的`--accumulation_steps`参数决定 diff --git a/accelerate_config.yaml b/accelerate_config.yaml new file mode 100644 index 0000000..1e841ce --- /dev/null +++ b/accelerate_config.yaml @@ -0,0 +1,17 @@ +compute_environment: LOCAL_MACHINE +deepspeed_config: + deepspeed_config_file: ds_config.json + zero3_init_flag: false +distributed_type: DEEPSPEED +downcast_bf16: 'no' +machine_rank: 0 +main_training_function: main +mixed_precision: bf16 +num_machines: 1 +num_processes: 4 +rdzv_backend: static +same_network: true +tpu_env: [] +tpu_use_cluster: false +tpu_use_sudo: false +use_cpu: false diff --git a/ds_config.json b/ds_config.json new file mode 100644 index 0000000..7175eea --- /dev/null +++ b/ds_config.json @@ -0,0 +1,49 @@ +{ + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "zero_optimization": { + "stage": 2, + "offload_optimizer": { + "device": "cpu", + "pin_memory": true + }, + "allgather_partitions": true, + "allgather_bucket_size": 5e8, + "overlap_comm": true, + "reduce_scatter": true, + "reduce_bucket_size": 5e8, + "contiguous_gradients": true + }, + "fp16": { + "enabled": "auto", + "loss_scale": 0, + "loss_scale_window": 1000, + "initial_scale_power": 16, + "hysteresis": 2, + "min_loss_scale": 1 + }, + "bf16": { + "enabled": "auto" + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": "auto", + "betas": "auto", + "eps": "auto", + "weight_decay": "auto" + } + }, + "scheduler": { + "type": "WarmupLR", + "params": { + "warmup_min_lr": "auto", + "warmup_max_lr": "auto", + "warmup_num_steps": "auto" + } + }, + "steps_per_print": 100, + "wall_clock_breakdown": false +} diff --git a/run_accelerate.sh b/run_accelerate.sh new file mode 100644 index 0000000..55edd0f --- /dev/null +++ b/run_accelerate.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +# 激活conda环境 +source $(conda info --base)/etc/profile.d/conda.sh +conda activate ycz_accelerate + +# 设置环境变量以帮助调试 +export NCCL_DEBUG=INFO +export PYTHONFAULTHANDLER=1 + +# 方法1: 使用预先配置的accelerate配置文件 +# accelerate launch --config_file accelerate_config.yaml train_pretrain_accelerate.py \ +# --epochs 3 \ +# --batch_size 24 \ +# --learning_rate 2e-4 \ +# --dtype bfloat16 \ +# --accumulation_steps 32 \ +# --grad_clip 1.0 \ +# --log_interval 100 \ +# --save_interval 10000 \ +# --dim 1024 \ +# --n_layers 32 \ +# --max_seq_len 1024 \ +# --use_flash_attn \ +# --profile \ +# --profile_interval 10 + +# 方法2: 使用命令行参数直接配置accelerate +CUDA_VISIBLE_DEVICES=0,1,2,3 accelerate launch \ + --multi_gpu \ + --num_processes=4 \ + --mixed_precision=bf16 \ + --main_process_port=29500 \ + train_pretrain_accelerate.py \ + --epochs 3 \ + --batch_size 24 \ + --learning_rate 2e-4 \ + --dtype bfloat16 \ + --accumulation_steps 32 \ + --grad_clip 1.0 \ + --log_interval 100 \ + --save_interval 10000 \ + --dim 1024 \ + --n_layers 32 \ + --max_seq_len 1024 \ + --use_flash_attn \ + --profile \ + --profile_interval 10 diff --git a/train_pretrain_accelerate.py b/train_pretrain_accelerate.py new file mode 100644 index 0000000..e791ca1 --- /dev/null +++ b/train_pretrain_accelerate.py @@ -0,0 +1,346 @@ +import os +# 设置环境变量 +os.environ["WANDB_MODE"] = "offline" # 或者使用 "dryrun" +import platform +import argparse +import time +import math +import warnings +import pandas as pd +import torch +from torch import optim, nn +from torch.utils.data import DataLoader +from contextlib import nullcontext +from typing import Optional +from accelerate import Accelerator +from accelerate.utils import set_seed +from accelerate.utils import DeepSpeedPlugin +from accelerate.utils import DistributedDataParallelKwargs +from transformers import AutoTokenizer, get_cosine_schedule_with_warmup + +from model.model import MiniMindLM +from model.LMConfig import LMConfig +from model.dataset import PretrainDataset + +warnings.filterwarnings('ignore') + +# 日志记录函数 +def Logger(msg, accelerator=None): + # 如果没有提供accelerator,则只在主进程打印 + if accelerator is None or accelerator.is_main_process: + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}") + +# 获取学习率函数 +def get_lr(it, num_iters, learning_rate): + # 余弦学习率衰减 + return learning_rate * 0.5 * (1.0 + math.cos(math.pi * it / num_iters)) + +# 初始化模型函数 +def init_model(lm_config, pretrained_embedding_path=None): + tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer') + model = MiniMindLM(lm_config) + + # 如果提供了预训练的嵌入权重,加载它们 + if pretrained_embedding_path: + Logger(f"Loading pretrained token embeddings from {pretrained_embedding_path}") + pretrained_embeddings = torch.load(pretrained_embedding_path) + model.tok_embeddings.weight.data.copy_(pretrained_embeddings) + model.output.weight.data.copy_(pretrained_embeddings) # 共享权重 + + Logger(f'LLM总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万') + return model, tokenizer + +def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx): + loss_fct = nn.CrossEntropyLoss(reduction='none') + start_time = time.time() + # 在函数开始处定义moe_path,避免在异常处理中引用未定义变量 + moe_path = '_moe' if args.use_moe else '' + + # 添加CUDA事件来分析性能 + if args.profile and accelerator.is_main_process: + data_start = torch.cuda.Event(enable_timing=True) + data_end = torch.cuda.Event(enable_timing=True) + forward_start = torch.cuda.Event(enable_timing=True) + forward_end = torch.cuda.Event(enable_timing=True) + backward_start = torch.cuda.Event(enable_timing=True) + backward_end = torch.cuda.Event(enable_timing=True) + optimizer_start = torch.cuda.Event(enable_timing=True) + optimizer_end = torch.cuda.Event(enable_timing=True) + + # 预取数据 + prefetch_factor = 2 # 预取的批次数 + data_iter = iter(train_loader) + prefetch_batches = [] + + # 预取初始批次 + for _ in range(min(prefetch_factor, len(train_loader))): + try: + batch = next(data_iter) + prefetch_batches.append(batch) + except StopIteration: + break + + for step in range(len(train_loader)): + try: + # 计时数据加载 + if args.profile and accelerator.is_main_process: + data_start.record() + + # 使用预取的数据 + if prefetch_batches: + X, Y, loss_mask = prefetch_batches.pop(0) + else: + # 如果预取队列为空,直接加载 + X, Y, loss_mask = next(data_iter) + + # 异步预取下一批数据 + if step + prefetch_factor < len(train_loader): + try: + batch = next(data_iter) + prefetch_batches.append(batch) + except StopIteration: + pass + + if args.profile and accelerator.is_main_process: + data_end.record() + + # 更新学习率 + if scheduler is not None: + scheduler.step() + + # 计时前向传播 + if args.profile and accelerator.is_main_process: + forward_start.record() + + # 前向传播 + with ctx: + res = model(X) + loss = loss_fct( + res.logits.view(-1, res.logits.size(-1)), + Y.view(-1) + ).view(Y.size()) + loss = (loss * loss_mask).sum() / loss_mask.sum() + # 添加辅助损失,如果存在的话 + try: + aux_loss = sum(l.feed_forward.aux_loss for l in model.module.layers + if hasattr(l.feed_forward, 'aux_loss')) + loss += aux_loss + except Exception as e: + Logger(f"Warning: Could not add auxiliary loss: {e}") + # 如果出错,不添加辅助损失 + loss = loss / args.accumulation_steps + + if args.profile and accelerator.is_main_process: + forward_end.record() + + # 计时反向传播 + if args.profile and accelerator.is_main_process: + backward_start.record() + + # 反向传播 + # 当使用DeepSpeed时,它会自动处理梯度累积和梯度裁剪 + accelerator.backward(loss) + + if args.profile and accelerator.is_main_process: + backward_end.record() + + # 计时优化器步骤 + if args.profile and accelerator.is_main_process: + optimizer_start.record() + + # 优化器步骤 - 当使用DeepSpeed时,它会自动处理梯度累积和梯度裁剪 + # 只有在达到累积步数时才会执行优化器步骤 + # 注意:当使用DeepSpeed时,它会自动处理梯度累积,所以我们不需要检查step % accumulation_steps + optimizer.step() + + # 当使用DeepSpeed时,zero_grad()会在step()之后自动调用 + # 但为了安全起见,我们仍然显式调用它 + optimizer.zero_grad() + + if args.profile and accelerator.is_main_process: + optimizer_end.record() + + # 打印训练信息 + if (step + 1) % args.log_interval == 0 and accelerator.is_main_process: + # 计算性能指标 + if args.profile: + torch.cuda.synchronize() + data_time = data_start.elapsed_time(data_end) if step > 0 else 0 + forward_time = forward_start.elapsed_time(forward_end) + backward_time = backward_start.elapsed_time(backward_end) + optimizer_time = optimizer_start.elapsed_time(optimizer_end) if (step + 1) % args.accumulation_steps == 0 else 0 + total_time = data_time + forward_time + backward_time + optimizer_time + + # 打印性能分析 + if (step + 1) % (args.log_interval * args.profile_interval) == 0: + Logger(f"性能分析 - 数据加载: {data_time:.2f}ms ({data_time/total_time*100:.1f}%), " + f"前向传播: {forward_time:.2f}ms ({forward_time/total_time*100:.1f}%), " + f"反向传播: {backward_time:.2f}ms ({backward_time/total_time*100:.1f}%), " + f"优化器: {optimizer_time:.2f}ms ({optimizer_time/total_time*100:.1f}%)", accelerator) + + # 计算当前学习率 + current_lr = optimizer.param_groups[0]['lr'] + + # 计算训练速度 + elapsed_time = time.time() - start_time + tokens_per_sec = (step + 1) * args.batch_size * args.max_seq_len / elapsed_time + + Logger(f"Epoch {epoch+1}/{args.epochs}, Step {step+1}/{len(train_loader)}, " + f"Loss: {loss.item()*args.accumulation_steps:.4f}, " + f"LR: {current_lr:.6f}, " + f"Speed: {tokens_per_sec:.2f} tokens/sec", accelerator) + + # 保存模型 + if (step + 1) % args.save_interval == 0 and accelerator.is_main_process: + # 使用函数开始处定义的moe_path变量 + ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth' + + # 获取解包后的模型 + unwrapped_model = accelerator.unwrap_model(model) + + # 保存模型参数 + accelerator.save(unwrapped_model.state_dict(), ckp) + Logger(f"Model saved to {ckp}", accelerator) + + except Exception as e: + Logger(f"Error in training step: {e}", accelerator) + import traceback + Logger(traceback.format_exc(), accelerator) + +def main(): + parser = argparse.ArgumentParser(description="MiniMind Pretraining with Accelerate") + parser.add_argument("--out_dir", type=str, default="out") + parser.add_argument("--epochs", type=int, default=3) + parser.add_argument("--batch_size", type=int, default=24) + parser.add_argument("--learning_rate", type=float, default=2e-4) + parser.add_argument("--dtype", type=str, default="bfloat16") + parser.add_argument("--use_wandb", default=True, action="store_true") + parser.add_argument("--wandb_project", type=str, default="MiniMind-Pretrain") + parser.add_argument("--num_workers", type=int, default=48) + parser.add_argument("--accumulation_steps", type=int, default=32) + parser.add_argument("--grad_clip", type=float, default=1.0) + parser.add_argument("--warmup_iters", type=int, default=0) + parser.add_argument("--log_interval", type=int, default=100) + parser.add_argument("--save_interval", type=int, default=10000) + parser.add_argument('--dim', default=1024, type=int) + parser.add_argument('--n_layers', default=32, type=int) + parser.add_argument('--max_seq_len', default=1024, type=int) + parser.add_argument('--use_moe', default=False, type=bool) + parser.add_argument('--disable_db', action='store_true', help="禁用数据库功能,使用固定值1e-4替代") + parser.add_argument("--data_path", type=str, default="./dataset/pretrain_hq.jsonl") + parser.add_argument("--pretrained_embedding_path", type=str, default=None, help="Path to pretrained token embedding weights (.pth file)") + parser.add_argument("--profile", action="store_true", default=True, help="启用性能分析") + parser.add_argument("--profile_interval", type=int, default=10, help="性能分析打印间隔(步数)") + parser.add_argument("--use_flash_attn", action="store_true", default=True, help="启用FlashAttention") + args = parser.parse_args() + + # 初始化accelerator + # 设置ddp_kwargs以处理未使用的参数 + ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True) + # 创建DeepSpeedPlugin对象 + ds_plugin = DeepSpeedPlugin( + gradient_accumulation_steps=args.accumulation_steps, + gradient_clipping=args.grad_clip, + zero_stage=2, # 使用ZeRO-2优化 + offload_optimizer_device="cpu", # 将优化器状态卸载到CPU + offload_param_device="none", # 不将参数卸载到CPU + ) + accelerator = Accelerator( + kwargs_handlers=[ddp_kwargs], + deepspeed_plugin=ds_plugin, + mixed_precision="bf16" if args.dtype == "bfloat16" else "fp16" if args.dtype == "float16" else "no" + ) + + # 设置随机种子 + set_seed(1337 + accelerator.process_index) + + # 配置模型 + lm_config = LMConfig( + dim=args.dim, + n_layers=args.n_layers, + max_seq_len=args.max_seq_len, + use_moe=args.use_moe, + disable_db=args.disable_db, + flash_attn=args.use_flash_attn + ) + + # 创建保存目录 + args.save_dir = os.path.join(args.out_dir) + if accelerator.is_main_process: + os.makedirs(args.save_dir, exist_ok=True) + os.makedirs(args.out_dir, exist_ok=True) + + # 计算每次迭代的token数量 + tokens_per_iter = args.batch_size * lm_config.max_seq_len + Logger(f"tokens_per_iter: {tokens_per_iter}", accelerator) + + # 设置数据类型 + pt_dtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[args.dtype] + + # 设置wandb运行名称 + args.wandb_run_name = f"MiniMind-Pretrain-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}" + + # 设置自动混合精度上下文 + ctx = nullcontext() if accelerator.device.type == "cpu" else torch.cuda.amp.autocast(dtype=pt_dtype) + + # 初始化模型和tokenizer + model, tokenizer = init_model(lm_config, args.pretrained_embedding_path) + # 将accelerator传递给init_model函数中的Logger调用 + Logger(f'模型初始化完成', accelerator) + + # 处理pos_cis复数张量问题 + # 方法1:将pos_cis转换为实数张量(两个实数张量表示实部和虚部) + # 这里我们采用方法2:告诉accelerate忽略pos_cis + # 在DeepSpeed模式下,我们需要设置DeepSpeed的参数 + if hasattr(model, "pos_cis"): + Logger(f'检测到pos_cis复数张量,将其设置为不参与分布式训练', accelerator) + # 设置模型的_ddp_params_and_buffers_to_ignore属性 + model._ddp_params_and_buffers_to_ignore = {"pos_cis"} + + # 创建数据集和数据加载器 + train_ds = PretrainDataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len) + train_loader = DataLoader( + train_ds, + batch_size=args.batch_size, + pin_memory=True, + drop_last=False, + shuffle=True, + num_workers=args.num_workers, + persistent_workers=True if args.num_workers > 0 else False, + prefetch_factor=2 if args.num_workers > 0 else None + ) + + # 创建优化器 + optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate) + + # 创建学习率调度器 + total_steps = len(train_loader) * args.epochs + warmup_steps = args.warmup_iters if args.warmup_iters > 0 else int(0.1 * total_steps) + scheduler = get_cosine_schedule_with_warmup( + optimizer, + num_warmup_steps=warmup_steps, + num_training_steps=total_steps + ) + + # 准备训练 + model, optimizer, train_loader, scheduler = accelerator.prepare( + model, optimizer, train_loader, scheduler + ) + + # 初始化wandb + if args.use_wandb and accelerator.is_main_process: + import wandb + wandb.init(project=args.wandb_project, name=args.wandb_run_name, config=args) + else: + wandb = None + + # 训练循环 + for epoch in range(args.epochs): + train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx) + + # 关闭wandb + if args.use_wandb and accelerator.is_main_process: + wandb.finish() + +if __name__ == "__main__": + main()