import os # 设置环境变量 os.environ["WANDB_MODE"] = "offline" # 或者使用 "dryrun" import platform import argparse import time import math import warnings import pandas as pd import torch import torch.distributed as dist from torch import optim, nn from torch.nn.parallel import DistributedDataParallel from torch.optim.lr_scheduler import CosineAnnealingLR from torch.utils.data import DataLoader, DistributedSampler from contextlib import nullcontext from transformers import AutoTokenizer # Removed: from model.model import MiniMindLM from model.LMConfig import LMConfig from model.dataset import PretrainDataset warnings.filterwarnings('ignore') # Define a simple model for pretraining embeddings class EmbeddingPretrainer(nn.Module): def __init__(self, config: LMConfig): super().__init__() self.tok_embeddings = nn.Embedding(config.vocab_size, config.dim) self.lm_head = nn.Linear(config.dim, config.vocab_size, bias=False) # Tie weights (optional but common) # self.tok_embeddings.weight = self.lm_head.weight def forward(self, input_ids): hidden_states = self.tok_embeddings(input_ids) logits = self.lm_head(hidden_states) return logits def Logger(content): # 如果没有使用ddp或者ddp的主设备,那么就打印 if not ddp or dist.get_rank() == 0: print(content) def get_lr(current_step, total_steps, lr): # 更新学习率 # \text{get\_lr}(c, t, l) = \frac{l}{10} + 0.5 \cdot l \cdot \left(1 + \cos\left(\frac{\pi \cdot c}{t}\right)\right) return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps)) def train_epoch(epoch, wandb): loss_fct = nn.CrossEntropyLoss(reduction='none', ignore_index=0) # Assuming 0 is pad_token_id start_time = time.time() for step, (X, Y, loss_mask) in enumerate(train_loader): try: # 将数据加载到设备上 X = X.to(args.device) Y = Y.to(args.device) loss_mask = loss_mask.to(args.device) # 更新学习率 lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate) for param_group in optimizer.param_groups: param_group['lr'] = lr with ctx: logits = model(X) # Model returns logits directly loss = loss_fct( logits.view(-1, logits.size(-1)), Y.view(-1) ).view(Y.size()) loss = (loss * loss_mask).sum() / loss_mask.sum() # Removed: loss += res.aux_loss loss = loss / args.accumulation_steps # Print data types for debugging if step == 0 and (not ddp or dist.get_rank() == 0): # Print only for the first step of the first epoch on the main process Logger("---- Data Type Check ----") Logger(f"X.dtype: {X.dtype}") if hasattr(model, 'module'): # DDP case Logger(f"Model parameter dtype: {next(model.module.parameters()).dtype}") else: # Non-DDP case Logger(f"Model parameter dtype: {next(model.parameters()).dtype}") Logger(f"logits.dtype: {logits.dtype}") # Changed from res.logits.dtype Logger(f"loss.dtype: {loss.dtype}") Logger("-------------------------") scaler.scale(loss).backward() if (step + 1) % args.accumulation_steps == 0: scaler.unscale_(optimizer) torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip) scaler.step(optimizer) scaler.update() optimizer.zero_grad(set_to_none=True) # 打印日志 if step % args.log_interval == 0: spend_time = time.time() - start_time Logger( 'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format( epoch + 1, args.epochs, step, iter_per_epoch, loss.item() * args.accumulation_steps, optimizer.param_groups[-1]['lr'], spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60)) if (wandb is not None) and (not ddp or dist.get_rank() == 0): wandb.log({"loss": loss.item() * args.accumulation_steps, "lr": optimizer.param_groups[-1]['lr'], "epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60}) # 保存模型 if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0): model.eval() # Modified checkpoint path and content ckp = f'{args.save_dir}/pretrained_embedding_dim{lm_config.dim}_vocab{lm_config.vocab_size}.pth' if isinstance(model, torch.nn.parallel.DistributedDataParallel): embedding_state_dict = model.module.tok_embeddings.state_dict() else: embedding_state_dict = model.tok_embeddings.state_dict() torch.save(embedding_state_dict, ckp) Logger(f"Saved pretrained embedding to {ckp}") model.train() except Exception as e: print(f"Error occurred: {str(e)}") # Modified checkpoint path for error save_path = f'{args.save_dir}/pretrained_embedding_dim{lm_config.dim}_vocab{lm_config.vocab_size}_ERROR.pth' if os.path.exists(save_path): os.remove(save_path) if isinstance(model, torch.nn.parallel.DistributedDataParallel): state_dict = model.module.tok_embeddings.state_dict() else: state_dict = model.tok_embeddings.state_dict() torch.save(state_dict, save_path) # Save embedding state dict on error for name, param in model.named_parameters(): if param.grad is not None and torch.isnan(param.grad).any(): print(f"NaN gradient in parameter: {name}") for name, param in model.named_parameters(): if param.grad is not None and torch.isnan(param.grad).any(): print(f"Parameter {name} values: {param.data}") print(f"Parameter {name} gradients: {param.grad}") raise ValueError("NaN gradient detected") def init_model(lm_config_params: LMConfig): # Renamed for clarity # 加载tokenizer tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer') # Update vocab_size in lm_config if tokenizer has a different one if tokenizer.vocab_size != lm_config_params.vocab_size: Logger(f"Updating lm_config.vocab_size from {lm_config_params.vocab_size} to {tokenizer.vocab_size} based on tokenizer.") lm_config_params.vocab_size = tokenizer.vocab_size # 加载模型 model = EmbeddingPretrainer(lm_config_params).to(args.device) # Use EmbeddingPretrainer # 打印模型参数 Logger(f'EmbeddingPretrainer total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} Million') return model, tokenizer def init_distributed_mode(): if not ddp: return #如果没有启用分布式数据并行(DDP),直接返回,不执行任何操作。 global ddp_local_rank, DEVICE #声明这两个变量为全局变量,以便在函数外部也能访问它们。 dist.init_process_group(backend="nccl") #初始化分布式进程组,使用NCCL后端(NVIDIA Collective Communications Library),这是NVIDIA GPU之间通信的优化库。 ddp_rank = int(os.environ["RANK"]) #从环境变量获取当前进程的全局编号。 ddp_local_rank = int(os.environ["LOCAL_RANK"]) #从环境变量获取当前进程的本地编号。 ddp_world_size = int(os.environ["WORLD_SIZE"]) #从环境变量获取当前进程组中的进程总数。 DEVICE = f"cuda:{ddp_local_rank}" #根据本地编号选择GPU设备。 torch.cuda.set_device(DEVICE) #设置当前进程的GPU设备。 # torchrun --nproc_per_node 2 train_embedding.py if __name__ == "__main__": parser = argparse.ArgumentParser(description="MiniMind Embedding Pretraining") # Changed description parser.add_argument("--out_dir", type=str, default="out_embedding") # Changed default out_dir # 若要以最快速度实现zero则epochs设置为1轮;否则应当利用有限的数据训练2~6个epochs。 parser.add_argument("--epochs", type=int, default=3) parser.add_argument("--batch_size", type=int, default=32) # Smaller batch size might be needed if memory is an issue parser.add_argument("--learning_rate", type=float, default=5e-4) parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu") #如果GPU可用,则使用GPU,否则使用CPU。 parser.add_argument("--dtype", type=str, default="bfloat16") parser.add_argument("--use_wandb", default=False, action="store_true") parser.add_argument("--wandb_project", type=str, default="MiniMind-Embedding-Pretrain") # Changed project name parser.add_argument("--num_workers", type=int, default=8) parser.add_argument("--ddp", action="store_true") parser.add_argument("--accumulation_steps", type=int, default=8) #梯度累积步数,用于控制梯度更新频率。 parser.add_argument("--grad_clip", type=float, default=1.0) #梯度裁剪阈值,用于防止梯度爆炸。 # parser.add_argument("--warmup_iters", type=int, default=0) #预热迭代次数,用于控制学习率预热过程。 (Can be kept or removed) parser.add_argument("--log_interval", type=int, default=100) #日志打印间隔,用于控制日志打印的频率。 parser.add_argument("--save_interval", type=int, default=100) #模型保存间隔,用于控制模型保存的频率。 parser.add_argument('--local_rank', type=int, default=-1) #本地进程编号,用于分布式训练。 parser.add_argument('--dim', default=768, type=int) #模型维度,用于控制模型的大小。 # Removed n_layers, use_moe as they are not relevant for EmbeddingPretrainer # parser.add_argument('--n_layers', default=8, type=int) parser.add_argument('--max_seq_len', default=512, type=int) #最大序列长度,用于控制输入序列的最大长度。 # parser.add_argument('--use_moe', default=False, type=bool) parser.add_argument("--data_path", type=str, default="./dataset/pretrain_hq.jsonl") #数据路径,用于控制数据集的路径。 # Add vocab_size to args, though it will be overridden by tokenizer if different parser.add_argument('--vocab_size', default=6400, type=int) args = parser.parse_args() # Create LMConfig with relevant parameters for embedding lm_config = LMConfig( dim=args.dim, vocab_size=args.vocab_size, # Will be updated by tokenizer max_seq_len=args.max_seq_len, # n_layers, n_heads, etc. are not directly used by EmbeddingPretrainer but LMConfig requires them # We can set them to default or minimal values if they cause issues, or modify LMConfig # For now, using defaults from LMConfig definition for unneeded params. n_layers=1, # Minimal n_heads=1, # Minimal n_kv_heads=1 #Minimal ) args.save_dir = os.path.join(args.out_dir) #创建保存目录。 os.makedirs(args.save_dir, exist_ok=True) #创建保存目录。 os.makedirs(args.out_dir, exist_ok=True) #创建输出目录。 tokens_per_iter = args.batch_size * lm_config.max_seq_len #计算每个迭代步骤的token数量。 print(f"tokens_per_iter: {tokens_per_iter}") device_type = "cuda" if "cuda" in args.device else "cpu" #确定设备类型。 # Determine the torch dtype pt_dtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[args.dtype] args.wandb_run_name = f"MiniMind-Embedding-Pretrain-Dim-{args.dim}-Vocab-{lm_config.vocab_size}" # Updated run name ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast(dtype=pt_dtype) ddp = int(os.environ.get("RANK", -1)) != -1 # is this a ddp run? ddp_local_rank, DEVICE = 0, "cuda:0" # Default values, will be overwritten in DDP base_seed = 1337 torch.manual_seed(base_seed) torch.cuda.manual_seed(base_seed) if ddp: init_distributed_mode() # This sets DEVICE and ddp_local_rank args.device = torch.device(DEVICE) # Ensure args.device is updated rank = dist.get_rank() torch.manual_seed(base_seed + rank) # 同时设置 CUDA 的随机种子 torch.cuda.manual_seed_all(base_seed + rank) # Use seed_all for DDP if args.use_wandb and (not ddp or dist.get_rank() == 0): # Check rank for DDP wandb init import wandb wandb.init(project=args.wandb_project, name=args.wandb_run_name, config=args) else: wandb = None model, tokenizer = init_model(lm_config) # Pass the lm_config instance # Update lm_config vocab_size again after tokenizer to ensure consistency for save path name if lm_config.vocab_size != tokenizer.vocab_size: lm_config.vocab_size = tokenizer.vocab_size args.wandb_run_name = f"MiniMind-Embedding-Pretrain-Dim-{args.dim}-Vocab-{lm_config.vocab_size}" if wandb is not None and (not ddp or dist.get_rank() == 0): wandb.config.update({'vocab_size': lm_config.vocab_size, 'wandb_run_name': args.wandb_run_name}, allow_val_change=True) train_ds = PretrainDataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len) train_sampler = DistributedSampler(train_ds, shuffle=True, seed=base_seed) if ddp else None # Added shuffle and seed train_loader = DataLoader( train_ds, batch_size=args.batch_size, pin_memory=True, drop_last=True, # Set to True for more stable training step counts shuffle=(train_sampler is None), # Shuffle only if not using DDP sampler num_workers=args.num_workers, sampler=train_sampler ) scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16'])) # bfloat16 also uses scaler optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate) if ddp: # model._ddp_params_and_buffers_to_ignore = {"pos_cis"} # Not relevant for EmbeddingPretrainer model = DistributedDataParallel(model, device_ids=[ddp_local_rank]) # torch.autograd.set_detect_anomaly(True) # Can be enabled for debugging iter_per_epoch = len(train_loader) Logger(f"Starting training for {args.epochs} epochs with {iter_per_epoch} iterations per epoch.") for epoch in range(args.epochs): if ddp: train_sampler.set_epoch(epoch) # Important for DDP shuffling train_epoch(epoch, wandb) if wandb is not None and (not ddp or dist.get_rank() == 0) : wandb.finish() Logger("Embedding pretraining finished.")