From f750edd9bac8e21c202ac30732db18eeff88a7c4 Mon Sep 17 00:00:00 2001 From: Yu Chengzhang Date: Mon, 23 Jun 2025 23:47:10 +0800 Subject: [PATCH] update --- model/model.py | 46 +++++++ train_pretrain_accelerate.py | 253 ++++++++++++++++++++++++++++------- 2 files changed, 247 insertions(+), 52 deletions(-) diff --git a/model/model.py b/model/model.py index a434ff6..6856d2c 100644 --- a/model/model.py +++ b/model/model.py @@ -2,6 +2,7 @@ import math import struct import inspect import time +import gc #子空间二维分解+梯度更新 from .LMConfig import LMConfig from typing import Any, Optional, Tuple, List, Union @@ -92,6 +93,15 @@ class KnowledgeDataset(nn.Module): device = all_scores.device dtype = all_scores.dtype + # 记录进入智能选择前的内存状态 + if hasattr(self, 'step_counter'): + self.step_counter += 1 + # 禁用GPU内存监控记录以提高性能 + # if self.step_counter % 50 == 0: # 每50次调用记录一次 + # if torch.cuda.is_available(): + # allocated_before = torch.cuda.memory_allocated() / (1024**3) + # print(f"[INTEL_SELECT_ENTER] Step {self.step_counter}: GPU Memory: {allocated_before:.2f}GB") + # 对每个batch进行分层选择 enhanced_scores = all_scores.clone() query_features = query.mean(dim=1) # [batch_size, dim] @@ -157,6 +167,24 @@ class KnowledgeDataset(nn.Module): all_best_tokens = torch.stack(batch_best_tokens, dim=0) all_best_tokens_embeddings = torch.stack(batch_best_tokens_embeddings, dim=0) + # 清理中间张量以防止内存泄漏 + del all_candidate_indices, unique_indices, inverse_indices + del unique_candidate_features, normalized_candidates, normalized_queries + del batch_best_tokens, batch_best_tokens_embeddings + del flat_tokens, flat_embeddings, pre_update_embeddings + + # 记录退出智能选择后的内存状态(已禁用以提高性能) + # if hasattr(self, 'step_counter') and self.step_counter % 50 == 0: + # if torch.cuda.is_available(): + # allocated_after = torch.cuda.memory_allocated() / (1024**3) + # print(f"[INTEL_SELECT_EXIT] Step {self.step_counter}: GPU Memory: {allocated_after:.2f}GB") + + # 强制垃圾回收(仅在监控步骤) + if hasattr(self, 'step_counter') and self.step_counter % 100 == 0: + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + return all_best_tokens, all_best_tokens_embeddings @@ -216,6 +244,16 @@ class CrossAttention(nn.Module): def forward(self, x, db, context_mask=None, pos_emb=None): batch_size = x.size(0) + + # 监控交叉注意力开始时的内存(已禁用以提高性能) + if not hasattr(self, 'call_counter'): + self.call_counter = 0 + self.call_counter += 1 + + # 禁用GPU内存监控记录以提高性能 + # if self.call_counter % 100 == 0 and torch.cuda.is_available(): + # allocated_before = torch.cuda.memory_allocated() / (1024**3) + # print(f"[CROSS_ATTN_ENTER] Call {self.call_counter}: GPU Memory: {allocated_before:.2f}GB") # 分离多头 q = self.to_q(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) @@ -241,6 +279,14 @@ class CrossAttention(nn.Module): context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.config.dim) context = self.to_out(context) + + # 清理中间张量 + del q, k, v, attn_scores, attn_weights + + # 监控交叉注意力结束时的内存(已禁用以提高性能) + # if self.call_counter % 100 == 0 and torch.cuda.is_available(): + # allocated_after = torch.cuda.memory_allocated() / (1024**3) + # print(f"[CROSS_ATTN_EXIT] Call {self.call_counter}: GPU Memory: {allocated_after:.2f}GB") return context diff --git a/train_pretrain_accelerate.py b/train_pretrain_accelerate.py index 97c98c9..e98d732 100644 --- a/train_pretrain_accelerate.py +++ b/train_pretrain_accelerate.py @@ -22,6 +22,8 @@ from transformers import AutoTokenizer, get_cosine_schedule_with_warmup import numpy as np from sklearn.metrics.pairwise import cosine_similarity import swanlab # 替换wandb导入 +import gc # 添加垃圾回收模块 +import psutil # 添加系统资源监控模块 from model.model import MiniMindLM, RMSNorm from model.LMConfig import LMConfig @@ -29,6 +31,63 @@ from model.dataset import PretrainDataset warnings.filterwarnings('ignore') +# 内存监控辅助函数 +def get_memory_usage(): + """获取当前内存使用情况""" + process = psutil.Process() + memory_info = process.memory_info() + return { + 'rss_mb': memory_info.rss / 1024 / 1024, # 物理内存使用量(MB) + 'vms_mb': memory_info.vms / 1024 / 1024, # 虚拟内存使用量(MB) + } + +def get_cuda_memory_usage(): + """获取CUDA内存使用情况""" + if torch.cuda.is_available(): + return { + 'cuda_allocated_mb': torch.cuda.memory_allocated() / 1024 / 1024, + 'cuda_reserved_mb': torch.cuda.memory_reserved() / 1024 / 1024, + 'cuda_max_allocated_mb': torch.cuda.max_memory_allocated() / 1024 / 1024, + } + return {} + +def get_tensor_memory_size(tensor_list): + """计算tensor列表的总内存占用(MB)""" + total_size = 0 + for batch in tensor_list: + if isinstance(batch, (list, tuple)): + for tensor in batch: + if isinstance(tensor, torch.Tensor): + total_size += tensor.numel() * tensor.element_size() + elif isinstance(batch, torch.Tensor): + total_size += batch.numel() * batch.element_size() + return total_size / 1024 / 1024 # 转换为MB + +def log_memory_status(step, prefetch_batches, accelerator, stage="", detailed=False): + """记录内存状态""" + if not accelerator.is_main_process: + return + + memory_info = get_memory_usage() + cuda_info = get_cuda_memory_usage() + prefetch_memory = get_tensor_memory_size(prefetch_batches) + + log_msg = f"[Memory Monitor] Step {step} {stage} - " + log_msg += f"Prefetch batches: {len(prefetch_batches)}, " + log_msg += f"Prefetch memory: {prefetch_memory:.2f}MB, " + log_msg += f"System RSS: {memory_info['rss_mb']:.2f}MB" + + if cuda_info: + log_msg += f", CUDA allocated: {cuda_info['cuda_allocated_mb']:.2f}MB" + log_msg += f", CUDA reserved: {cuda_info['cuda_reserved_mb']:.2f}MB" + + if detailed: + log_msg += f", System VMS: {memory_info['vms_mb']:.2f}MB" + if cuda_info: + log_msg += f", CUDA max allocated: {cuda_info['cuda_max_allocated_mb']:.2f}MB" + + Logger(log_msg, accelerator) + # 日志记录函数 def Logger(msg, accelerator=None): # 如果没有提供accelerator,则只在主进程打印 @@ -227,65 +286,92 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a moe_path = '_moe' if args.use_moe else '' best_loss = float('10000') + # 初始化CUDA事件变量 + data_start = data_end = forward_start = forward_end = None + backward_start = backward_end = optimizer_start = optimizer_end = None + # 添加CUDA事件来分析性能 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # data_start = torch.cuda.Event(enable_timing=True) - # data_end = torch.cuda.Event(enable_timing=True) - # forward_start = torch.cuda.Event(enable_timing=True) - # forward_end = torch.cuda.Event(enable_timing=True) - # backward_start = torch.cuda.Event(enable_timing=True) - # backward_end = torch.cuda.Event(enable_timing=True) - # optimizer_start = torch.cuda.Event(enable_timing=True) - # optimizer_end = torch.cuda.Event(enable_timing=True) + if args.profile and accelerator.is_main_process: + data_start = torch.cuda.Event(enable_timing=True) + data_end = torch.cuda.Event(enable_timing=True) + forward_start = torch.cuda.Event(enable_timing=True) + forward_end = torch.cuda.Event(enable_timing=True) + backward_start = torch.cuda.Event(enable_timing=True) + backward_end = torch.cuda.Event(enable_timing=True) + optimizer_start = torch.cuda.Event(enable_timing=True) + optimizer_end = torch.cuda.Event(enable_timing=True) # 预取数据 prefetch_factor = 2 # 预取的批次数 data_iter = iter(train_loader) prefetch_batches = [] + # 记录初始内存状态 + if args.memory_monitor: + log_memory_status(-1, prefetch_batches, accelerator, "before_prefetch", detailed=True) + # 预取初始批次 - for _ in range(min(prefetch_factor, len(train_loader))): + for i in range(min(prefetch_factor, len(train_loader))): try: batch = next(data_iter) prefetch_batches.append(batch) + # 每次添加batch后记录内存变化 + if args.memory_monitor and accelerator.is_main_process: + log_memory_status(-1, prefetch_batches, accelerator, f"after_adding_batch_{i+1}") except StopIteration: break + # 记录预取完成后的内存状态 + if args.memory_monitor: + log_memory_status(-1, prefetch_batches, accelerator, "after_initial_prefetch", detailed=True) + # 在开始循环前初始化日志记录所需变量 last_log_time = epoch_start_time for step in range(total_steps_in_epoch): try: # 计时数据加载 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # data_start.record() + if args.profile and accelerator.is_main_process and data_start is not None: + data_start.record() + + # 记录使用batch前的内存状态(根据配置间隔记录详细信息) + if args.memory_monitor and step % args.memory_monitor_interval == 0: + log_memory_status(step, prefetch_batches, accelerator, "before_use_batch", detailed=True) # 使用预取的数据 if prefetch_batches: X, Y, loss_mask = prefetch_batches.pop(0) + # 记录使用batch后的内存变化 + if args.memory_monitor and step % args.memory_monitor_interval == 0: + log_memory_status(step, prefetch_batches, accelerator, "after_pop_batch") else: # 如果预取队列为空,直接加载 X, Y, loss_mask = next(data_iter) + if args.memory_monitor and accelerator.is_main_process: + Logger(f"[Memory Monitor] Step {step} - Prefetch queue empty, loading directly!", accelerator) # 异步预取下一批数据 if step + prefetch_factor < len(train_loader): try: batch = next(data_iter) prefetch_batches.append(batch) + # 记录添加新batch后的内存变化 + if args.memory_monitor and step % args.memory_monitor_interval == 0: + log_memory_status(step, prefetch_batches, accelerator, "after_add_batch") except StopIteration: pass # 计时数据加载结束 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # data_end.record() + if args.profile and accelerator.is_main_process and data_end is not None: + data_end.record() # 更新学习率 if scheduler is not None: scheduler.step() # 计时前向传播 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # forward_start.record() + if args.profile and accelerator.is_main_process and forward_start is not None: + forward_start.record() # 前向传播 with ctx: @@ -311,24 +397,24 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a loss = loss / args.accumulation_steps # 计时前向传播结束 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # forward_end.record() + if args.profile and accelerator.is_main_process and forward_end is not None: + forward_end.record() # 计时反向传播 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # backward_start.record() + if args.profile and accelerator.is_main_process and backward_start is not None: + backward_start.record() # 反向传播 # 当使用DeepSpeed时,它会自动处理梯度累积和梯度裁剪 accelerator.backward(loss) # 计时反向传播结束 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # backward_end.record() + if args.profile and accelerator.is_main_process and backward_end is not None: + backward_end.record() # 计时优化器步骤 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # optimizer_start.record() + if args.profile and accelerator.is_main_process and optimizer_start is not None: + optimizer_start.record() # 优化器步骤 - 当使用DeepSpeed时,它会自动处理梯度累积和梯度裁剪 # 只有在达到累积步数时才会执行优化器步骤 @@ -340,40 +426,58 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a optimizer.zero_grad() # 计时优化器步骤结束 (只在主进程进行) - # if args.profile and accelerator.is_main_process: - # optimizer_end.record() + if args.profile and accelerator.is_main_process and optimizer_end is not None: + optimizer_end.record() # 打印训练信息 (只在主进程进行) if (step + 1) % args.log_interval == 0 and accelerator.is_main_process: current_time = time.time() + + # 记录日志输出时的详细内存状态 + if args.memory_monitor: + log_memory_status(step, prefetch_batches, accelerator, "at_log_interval", detailed=True) + + # 强制垃圾回收并记录内存变化 + if torch.cuda.is_available(): + torch.cuda.empty_cache() + gc.collect() + log_memory_status(step, prefetch_batches, accelerator, "after_gc", detailed=True) + # 计算性能指标 - if args.profile: + if args.profile and accelerator.is_main_process: torch.cuda.synchronize() - # 使用自上次日志以来的时间计算性能指标,而不是总时间 - data_time = data_start.elapsed_time(data_end) - forward_time = forward_start.elapsed_time(forward_end) - backward_time = backward_start.elapsed_time(backward_end) - optimizer_time = optimizer_start.elapsed_time(optimizer_end) - iter_time = (current_time - last_log_time) * 1000 / args.log_interval # avg ms per iteration since last log - # total_time_ms = data_time + forward_time + backward_time + optimizer_time + + # 确保所有事件都已记录才计算elapsed_time + try: + data_time = data_start.elapsed_time(data_end) if data_start is not None and data_end is not None else 0 + forward_time = forward_start.elapsed_time(forward_end) if forward_start is not None and forward_end is not None else 0 + backward_time = backward_start.elapsed_time(backward_end) if backward_start is not None and backward_end is not None else 0 + optimizer_time = optimizer_start.elapsed_time(optimizer_end) if optimizer_start is not None and optimizer_end is not None else 0 + iter_time = (current_time - last_log_time) * 1000 / args.log_interval # avg ms per iteration since last log + # total_time_ms = data_time + forward_time + backward_time + optimizer_time - # 打印性能分析 - if (step + 1) % (args.log_interval * args.profile_interval) == 0: - Logger(f"性能分析 (Avg/iter over last {args.log_interval} steps) - " - f"Data: {data_time/args.log_interval:.2f}ms, " - f"Fwd: {forward_time/args.log_interval:.2f}ms, " - f"Bwd: {backward_time/args.log_interval:.2f}ms, " - f"Optim: {optimizer_time/args.log_interval:.2f}ms, " - f"Iter Time: {iter_time:.2f}ms", accelerator) - # 重置事件以便下次测量从0开始 - data_start = torch.cuda.Event(enable_timing=True) - data_end = torch.cuda.Event(enable_timing=True) - forward_start = torch.cuda.Event(enable_timing=True) - forward_end = torch.cuda.Event(enable_timing=True) - backward_start = torch.cuda.Event(enable_timing=True) - backward_end = torch.cuda.Event(enable_timing=True) - optimizer_start = torch.cuda.Event(enable_timing=True) - optimizer_end = torch.cuda.Event(enable_timing=True) + # 打印性能分析 + if (step + 1) % (args.log_interval * args.profile_interval) == 0: + Logger(f"性能分析 (Avg/iter over last {args.log_interval} steps) - " + f"Data: {data_time/args.log_interval:.2f}ms, " + f"Fwd: {forward_time/args.log_interval:.2f}ms, " + f"Bwd: {backward_time/args.log_interval:.2f}ms, " + f"Optim: {optimizer_time/args.log_interval:.2f}ms, " + f"Iter Time: {iter_time:.2f}ms", accelerator) + # 重置事件以便下次测量从0开始 + data_start = torch.cuda.Event(enable_timing=True) + data_end = torch.cuda.Event(enable_timing=True) + forward_start = torch.cuda.Event(enable_timing=True) + forward_end = torch.cuda.Event(enable_timing=True) + backward_start = torch.cuda.Event(enable_timing=True) + backward_end = torch.cuda.Event(enable_timing=True) + optimizer_start = torch.cuda.Event(enable_timing=True) + optimizer_end = torch.cuda.Event(enable_timing=True) + except RuntimeError as e: + if "Both events must be recorded" in str(e): + Logger(f"Warning: CUDA events not properly recorded, skipping performance analysis: {e}", accelerator) + else: + raise e # 计算当前学习率 @@ -419,7 +523,7 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a # 保存模型 (只在主进程进行) loss_total = loss.item() * args.accumulation_steps - if epoch > 1 or best_loss > loss_total and accelerator.is_main_process: + if epoch > 1 and best_loss > loss_total and accelerator.is_main_process: best_loss = loss_total # 使用函数开始处定义的moe_path变量 ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth' @@ -433,8 +537,33 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a except Exception as e: Logger(f"Error in training step: {e}", accelerator) + # 记录异常时的内存状态 + if args.memory_monitor: + log_memory_status(step, prefetch_batches, accelerator, "at_exception", detailed=True) import traceback Logger(traceback.format_exc(), accelerator) + + # 清理prefetch_batches,防止内存泄漏 + if args.memory_monitor and accelerator.is_main_process: + Logger(f"[Memory Monitor] Clearing prefetch_batches due to exception. Current length: {len(prefetch_batches)}", accelerator) + prefetch_batches.clear() + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + if args.memory_monitor: + log_memory_status(step, prefetch_batches, accelerator, "after_exception_cleanup", detailed=True) + + # 训练epoch结束时清理prefetch_batches + if args.memory_monitor: + if accelerator.is_main_process: + Logger(f"[Memory Monitor] Epoch {epoch+1} finished. Clearing prefetch_batches. Final length: {len(prefetch_batches)}", accelerator) + log_memory_status(total_steps_in_epoch-1, prefetch_batches, accelerator, "before_epoch_end_cleanup", detailed=True) + prefetch_batches.clear() + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + if args.memory_monitor: + log_memory_status(total_steps_in_epoch-1, prefetch_batches, accelerator, "after_epoch_end_cleanup", detailed=True) def main(): parser = argparse.ArgumentParser(description="MiniMind Pretraining with Accelerate") @@ -468,6 +597,8 @@ def main(): parser.add_argument("--fast_clustering", action="store_true", default=True, help="使用快速近似聚类算法(适用于大数据集)") parser.add_argument("--cluster_cache_path", type=str, default="./cache/cluster_tokens_single.pt", help="聚类结果缓存文件路径") parser.add_argument("--recompute_clusters", action="store_true", default=False, help="强制重新计算聚类,忽略缓存文件") + parser.add_argument("--memory_monitor", action="store_true", default=False, help="启用内存监控") + parser.add_argument("--memory_monitor_interval", type=int, default=10, help="内存监控间隔(步数)") args = parser.parse_args() ######################################################### @@ -629,7 +760,25 @@ def main(): ######################################################### overall_start_time = time.time() # Record overall start time for epoch in range(args.epochs): + Logger(f"开始第{epoch+1}轮训练", accelerator) train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time, swanlab_run) # Pass overall start time + + # 每个epoch结束后进行内存清理 + Logger(f"第{epoch+1}轮训练完成,进行内存清理", accelerator) + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + # 记录epoch结束时的内存状态 + if accelerator.is_main_process: + memory_info = get_memory_usage() + cuda_info = get_cuda_memory_usage() + log_msg = f"[Memory Monitor] Epoch {epoch+1} completed - " + log_msg += f"System RSS: {memory_info['rss_mb']:.2f}MB" + if cuda_info: + log_msg += f", CUDA allocated: {cuda_info['cuda_allocated_mb']:.2f}MB" + log_msg += f", CUDA reserved: {cuda_info['cuda_reserved_mb']:.2f}MB" + Logger(log_msg, accelerator) ######################################################### # 关闭SwanLab