This commit is contained in:
Yu Chengzhang 2025-06-23 23:47:10 +08:00
parent 5f19adcffa
commit f750edd9ba
2 changed files with 247 additions and 52 deletions

View File

@ -2,6 +2,7 @@ import math
import struct import struct
import inspect import inspect
import time import time
import gc
#子空间二维分解+梯度更新 #子空间二维分解+梯度更新
from .LMConfig import LMConfig from .LMConfig import LMConfig
from typing import Any, Optional, Tuple, List, Union from typing import Any, Optional, Tuple, List, Union
@ -92,6 +93,15 @@ class KnowledgeDataset(nn.Module):
device = all_scores.device device = all_scores.device
dtype = all_scores.dtype dtype = all_scores.dtype
# 记录进入智能选择前的内存状态
if hasattr(self, 'step_counter'):
self.step_counter += 1
# 禁用GPU内存监控记录以提高性能
# if self.step_counter % 50 == 0: # 每50次调用记录一次
# if torch.cuda.is_available():
# allocated_before = torch.cuda.memory_allocated() / (1024**3)
# print(f"[INTEL_SELECT_ENTER] Step {self.step_counter}: GPU Memory: {allocated_before:.2f}GB")
# 对每个batch进行分层选择 # 对每个batch进行分层选择
enhanced_scores = all_scores.clone() enhanced_scores = all_scores.clone()
query_features = query.mean(dim=1) # [batch_size, dim] query_features = query.mean(dim=1) # [batch_size, dim]
@ -157,6 +167,24 @@ class KnowledgeDataset(nn.Module):
all_best_tokens = torch.stack(batch_best_tokens, dim=0) all_best_tokens = torch.stack(batch_best_tokens, dim=0)
all_best_tokens_embeddings = torch.stack(batch_best_tokens_embeddings, dim=0) all_best_tokens_embeddings = torch.stack(batch_best_tokens_embeddings, dim=0)
# 清理中间张量以防止内存泄漏
del all_candidate_indices, unique_indices, inverse_indices
del unique_candidate_features, normalized_candidates, normalized_queries
del batch_best_tokens, batch_best_tokens_embeddings
del flat_tokens, flat_embeddings, pre_update_embeddings
# 记录退出智能选择后的内存状态(已禁用以提高性能)
# if hasattr(self, 'step_counter') and self.step_counter % 50 == 0:
# if torch.cuda.is_available():
# allocated_after = torch.cuda.memory_allocated() / (1024**3)
# print(f"[INTEL_SELECT_EXIT] Step {self.step_counter}: GPU Memory: {allocated_after:.2f}GB")
# 强制垃圾回收(仅在监控步骤)
if hasattr(self, 'step_counter') and self.step_counter % 100 == 0:
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
return all_best_tokens, all_best_tokens_embeddings return all_best_tokens, all_best_tokens_embeddings
@ -217,6 +245,16 @@ class CrossAttention(nn.Module):
def forward(self, x, db, context_mask=None, pos_emb=None): def forward(self, x, db, context_mask=None, pos_emb=None):
batch_size = x.size(0) batch_size = x.size(0)
# 监控交叉注意力开始时的内存(已禁用以提高性能)
if not hasattr(self, 'call_counter'):
self.call_counter = 0
self.call_counter += 1
# 禁用GPU内存监控记录以提高性能
# if self.call_counter % 100 == 0 and torch.cuda.is_available():
# allocated_before = torch.cuda.memory_allocated() / (1024**3)
# print(f"[CROSS_ATTN_ENTER] Call {self.call_counter}: GPU Memory: {allocated_before:.2f}GB")
# 分离多头 # 分离多头
q = self.to_q(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) q = self.to_q(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
k = self.to_k(db).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) k = self.to_k(db).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
@ -242,6 +280,14 @@ class CrossAttention(nn.Module):
context = self.to_out(context) context = self.to_out(context)
# 清理中间张量
del q, k, v, attn_scores, attn_weights
# 监控交叉注意力结束时的内存(已禁用以提高性能)
# if self.call_counter % 100 == 0 and torch.cuda.is_available():
# allocated_after = torch.cuda.memory_allocated() / (1024**3)
# print(f"[CROSS_ATTN_EXIT] Call {self.call_counter}: GPU Memory: {allocated_after:.2f}GB")
return context return context
class Attention(nn.Module): class Attention(nn.Module):

View File

@ -22,6 +22,8 @@ from transformers import AutoTokenizer, get_cosine_schedule_with_warmup
import numpy as np import numpy as np
from sklearn.metrics.pairwise import cosine_similarity from sklearn.metrics.pairwise import cosine_similarity
import swanlab # 替换wandb导入 import swanlab # 替换wandb导入
import gc # 添加垃圾回收模块
import psutil # 添加系统资源监控模块
from model.model import MiniMindLM, RMSNorm from model.model import MiniMindLM, RMSNorm
from model.LMConfig import LMConfig from model.LMConfig import LMConfig
@ -29,6 +31,63 @@ from model.dataset import PretrainDataset
warnings.filterwarnings('ignore') warnings.filterwarnings('ignore')
# 内存监控辅助函数
def get_memory_usage():
"""获取当前内存使用情况"""
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024, # 物理内存使用量MB
'vms_mb': memory_info.vms / 1024 / 1024, # 虚拟内存使用量MB
}
def get_cuda_memory_usage():
"""获取CUDA内存使用情况"""
if torch.cuda.is_available():
return {
'cuda_allocated_mb': torch.cuda.memory_allocated() / 1024 / 1024,
'cuda_reserved_mb': torch.cuda.memory_reserved() / 1024 / 1024,
'cuda_max_allocated_mb': torch.cuda.max_memory_allocated() / 1024 / 1024,
}
return {}
def get_tensor_memory_size(tensor_list):
"""计算tensor列表的总内存占用MB"""
total_size = 0
for batch in tensor_list:
if isinstance(batch, (list, tuple)):
for tensor in batch:
if isinstance(tensor, torch.Tensor):
total_size += tensor.numel() * tensor.element_size()
elif isinstance(batch, torch.Tensor):
total_size += batch.numel() * batch.element_size()
return total_size / 1024 / 1024 # 转换为MB
def log_memory_status(step, prefetch_batches, accelerator, stage="", detailed=False):
"""记录内存状态"""
if not accelerator.is_main_process:
return
memory_info = get_memory_usage()
cuda_info = get_cuda_memory_usage()
prefetch_memory = get_tensor_memory_size(prefetch_batches)
log_msg = f"[Memory Monitor] Step {step} {stage} - "
log_msg += f"Prefetch batches: {len(prefetch_batches)}, "
log_msg += f"Prefetch memory: {prefetch_memory:.2f}MB, "
log_msg += f"System RSS: {memory_info['rss_mb']:.2f}MB"
if cuda_info:
log_msg += f", CUDA allocated: {cuda_info['cuda_allocated_mb']:.2f}MB"
log_msg += f", CUDA reserved: {cuda_info['cuda_reserved_mb']:.2f}MB"
if detailed:
log_msg += f", System VMS: {memory_info['vms_mb']:.2f}MB"
if cuda_info:
log_msg += f", CUDA max allocated: {cuda_info['cuda_max_allocated_mb']:.2f}MB"
Logger(log_msg, accelerator)
# 日志记录函数 # 日志记录函数
def Logger(msg, accelerator=None): def Logger(msg, accelerator=None):
# 如果没有提供accelerator则只在主进程打印 # 如果没有提供accelerator则只在主进程打印
@ -227,65 +286,92 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
moe_path = '_moe' if args.use_moe else '' moe_path = '_moe' if args.use_moe else ''
best_loss = float('10000') best_loss = float('10000')
# 初始化CUDA事件变量
data_start = data_end = forward_start = forward_end = None
backward_start = backward_end = optimizer_start = optimizer_end = None
# 添加CUDA事件来分析性能 (只在主进程进行) # 添加CUDA事件来分析性能 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process:
# data_start = torch.cuda.Event(enable_timing=True) data_start = torch.cuda.Event(enable_timing=True)
# data_end = torch.cuda.Event(enable_timing=True) data_end = torch.cuda.Event(enable_timing=True)
# forward_start = torch.cuda.Event(enable_timing=True) forward_start = torch.cuda.Event(enable_timing=True)
# forward_end = torch.cuda.Event(enable_timing=True) forward_end = torch.cuda.Event(enable_timing=True)
# backward_start = torch.cuda.Event(enable_timing=True) backward_start = torch.cuda.Event(enable_timing=True)
# backward_end = torch.cuda.Event(enable_timing=True) backward_end = torch.cuda.Event(enable_timing=True)
# optimizer_start = torch.cuda.Event(enable_timing=True) optimizer_start = torch.cuda.Event(enable_timing=True)
# optimizer_end = torch.cuda.Event(enable_timing=True) optimizer_end = torch.cuda.Event(enable_timing=True)
# 预取数据 # 预取数据
prefetch_factor = 2 # 预取的批次数 prefetch_factor = 2 # 预取的批次数
data_iter = iter(train_loader) data_iter = iter(train_loader)
prefetch_batches = [] prefetch_batches = []
# 记录初始内存状态
if args.memory_monitor:
log_memory_status(-1, prefetch_batches, accelerator, "before_prefetch", detailed=True)
# 预取初始批次 # 预取初始批次
for _ in range(min(prefetch_factor, len(train_loader))): for i in range(min(prefetch_factor, len(train_loader))):
try: try:
batch = next(data_iter) batch = next(data_iter)
prefetch_batches.append(batch) prefetch_batches.append(batch)
# 每次添加batch后记录内存变化
if args.memory_monitor and accelerator.is_main_process:
log_memory_status(-1, prefetch_batches, accelerator, f"after_adding_batch_{i+1}")
except StopIteration: except StopIteration:
break break
# 记录预取完成后的内存状态
if args.memory_monitor:
log_memory_status(-1, prefetch_batches, accelerator, "after_initial_prefetch", detailed=True)
# 在开始循环前初始化日志记录所需变量 # 在开始循环前初始化日志记录所需变量
last_log_time = epoch_start_time last_log_time = epoch_start_time
for step in range(total_steps_in_epoch): for step in range(total_steps_in_epoch):
try: try:
# 计时数据加载 (只在主进程进行) # 计时数据加载 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and data_start is not None:
# data_start.record() data_start.record()
# 记录使用batch前的内存状态根据配置间隔记录详细信息
if args.memory_monitor and step % args.memory_monitor_interval == 0:
log_memory_status(step, prefetch_batches, accelerator, "before_use_batch", detailed=True)
# 使用预取的数据 # 使用预取的数据
if prefetch_batches: if prefetch_batches:
X, Y, loss_mask = prefetch_batches.pop(0) X, Y, loss_mask = prefetch_batches.pop(0)
# 记录使用batch后的内存变化
if args.memory_monitor and step % args.memory_monitor_interval == 0:
log_memory_status(step, prefetch_batches, accelerator, "after_pop_batch")
else: else:
# 如果预取队列为空,直接加载 # 如果预取队列为空,直接加载
X, Y, loss_mask = next(data_iter) X, Y, loss_mask = next(data_iter)
if args.memory_monitor and accelerator.is_main_process:
Logger(f"[Memory Monitor] Step {step} - Prefetch queue empty, loading directly!", accelerator)
# 异步预取下一批数据 # 异步预取下一批数据
if step + prefetch_factor < len(train_loader): if step + prefetch_factor < len(train_loader):
try: try:
batch = next(data_iter) batch = next(data_iter)
prefetch_batches.append(batch) prefetch_batches.append(batch)
# 记录添加新batch后的内存变化
if args.memory_monitor and step % args.memory_monitor_interval == 0:
log_memory_status(step, prefetch_batches, accelerator, "after_add_batch")
except StopIteration: except StopIteration:
pass pass
# 计时数据加载结束 (只在主进程进行) # 计时数据加载结束 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and data_end is not None:
# data_end.record() data_end.record()
# 更新学习率 # 更新学习率
if scheduler is not None: if scheduler is not None:
scheduler.step() scheduler.step()
# 计时前向传播 (只在主进程进行) # 计时前向传播 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and forward_start is not None:
# forward_start.record() forward_start.record()
# 前向传播 # 前向传播
with ctx: with ctx:
@ -311,24 +397,24 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
loss = loss / args.accumulation_steps loss = loss / args.accumulation_steps
# 计时前向传播结束 (只在主进程进行) # 计时前向传播结束 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and forward_end is not None:
# forward_end.record() forward_end.record()
# 计时反向传播 (只在主进程进行) # 计时反向传播 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and backward_start is not None:
# backward_start.record() backward_start.record()
# 反向传播 # 反向传播
# 当使用DeepSpeed时它会自动处理梯度累积和梯度裁剪 # 当使用DeepSpeed时它会自动处理梯度累积和梯度裁剪
accelerator.backward(loss) accelerator.backward(loss)
# 计时反向传播结束 (只在主进程进行) # 计时反向传播结束 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and backward_end is not None:
# backward_end.record() backward_end.record()
# 计时优化器步骤 (只在主进程进行) # 计时优化器步骤 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and optimizer_start is not None:
# optimizer_start.record() optimizer_start.record()
# 优化器步骤 - 当使用DeepSpeed时它会自动处理梯度累积和梯度裁剪 # 优化器步骤 - 当使用DeepSpeed时它会自动处理梯度累积和梯度裁剪
# 只有在达到累积步数时才会执行优化器步骤 # 只有在达到累积步数时才会执行优化器步骤
@ -340,40 +426,58 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
optimizer.zero_grad() optimizer.zero_grad()
# 计时优化器步骤结束 (只在主进程进行) # 计时优化器步骤结束 (只在主进程进行)
# if args.profile and accelerator.is_main_process: if args.profile and accelerator.is_main_process and optimizer_end is not None:
# optimizer_end.record() optimizer_end.record()
# 打印训练信息 (只在主进程进行) # 打印训练信息 (只在主进程进行)
if (step + 1) % args.log_interval == 0 and accelerator.is_main_process: if (step + 1) % args.log_interval == 0 and accelerator.is_main_process:
current_time = time.time() current_time = time.time()
# 计算性能指标
if args.profile:
torch.cuda.synchronize()
# 使用自上次日志以来的时间计算性能指标,而不是总时间
data_time = data_start.elapsed_time(data_end)
forward_time = forward_start.elapsed_time(forward_end)
backward_time = backward_start.elapsed_time(backward_end)
optimizer_time = optimizer_start.elapsed_time(optimizer_end)
iter_time = (current_time - last_log_time) * 1000 / args.log_interval # avg ms per iteration since last log
# total_time_ms = data_time + forward_time + backward_time + optimizer_time
# 打印性能分析 # 记录日志输出时的详细内存状态
if (step + 1) % (args.log_interval * args.profile_interval) == 0: if args.memory_monitor:
Logger(f"性能分析 (Avg/iter over last {args.log_interval} steps) - " log_memory_status(step, prefetch_batches, accelerator, "at_log_interval", detailed=True)
f"Data: {data_time/args.log_interval:.2f}ms, "
f"Fwd: {forward_time/args.log_interval:.2f}ms, " # 强制垃圾回收并记录内存变化
f"Bwd: {backward_time/args.log_interval:.2f}ms, " if torch.cuda.is_available():
f"Optim: {optimizer_time/args.log_interval:.2f}ms, " torch.cuda.empty_cache()
f"Iter Time: {iter_time:.2f}ms", accelerator) gc.collect()
# 重置事件以便下次测量从0开始 log_memory_status(step, prefetch_batches, accelerator, "after_gc", detailed=True)
data_start = torch.cuda.Event(enable_timing=True)
data_end = torch.cuda.Event(enable_timing=True) # 计算性能指标
forward_start = torch.cuda.Event(enable_timing=True) if args.profile and accelerator.is_main_process:
forward_end = torch.cuda.Event(enable_timing=True) torch.cuda.synchronize()
backward_start = torch.cuda.Event(enable_timing=True)
backward_end = torch.cuda.Event(enable_timing=True) # 确保所有事件都已记录才计算elapsed_time
optimizer_start = torch.cuda.Event(enable_timing=True) try:
optimizer_end = torch.cuda.Event(enable_timing=True) data_time = data_start.elapsed_time(data_end) if data_start is not None and data_end is not None else 0
forward_time = forward_start.elapsed_time(forward_end) if forward_start is not None and forward_end is not None else 0
backward_time = backward_start.elapsed_time(backward_end) if backward_start is not None and backward_end is not None else 0
optimizer_time = optimizer_start.elapsed_time(optimizer_end) if optimizer_start is not None and optimizer_end is not None else 0
iter_time = (current_time - last_log_time) * 1000 / args.log_interval # avg ms per iteration since last log
# total_time_ms = data_time + forward_time + backward_time + optimizer_time
# 打印性能分析
if (step + 1) % (args.log_interval * args.profile_interval) == 0:
Logger(f"性能分析 (Avg/iter over last {args.log_interval} steps) - "
f"Data: {data_time/args.log_interval:.2f}ms, "
f"Fwd: {forward_time/args.log_interval:.2f}ms, "
f"Bwd: {backward_time/args.log_interval:.2f}ms, "
f"Optim: {optimizer_time/args.log_interval:.2f}ms, "
f"Iter Time: {iter_time:.2f}ms", accelerator)
# 重置事件以便下次测量从0开始
data_start = torch.cuda.Event(enable_timing=True)
data_end = torch.cuda.Event(enable_timing=True)
forward_start = torch.cuda.Event(enable_timing=True)
forward_end = torch.cuda.Event(enable_timing=True)
backward_start = torch.cuda.Event(enable_timing=True)
backward_end = torch.cuda.Event(enable_timing=True)
optimizer_start = torch.cuda.Event(enable_timing=True)
optimizer_end = torch.cuda.Event(enable_timing=True)
except RuntimeError as e:
if "Both events must be recorded" in str(e):
Logger(f"Warning: CUDA events not properly recorded, skipping performance analysis: {e}", accelerator)
else:
raise e
# 计算当前学习率 # 计算当前学习率
@ -419,7 +523,7 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
# 保存模型 (只在主进程进行) # 保存模型 (只在主进程进行)
loss_total = loss.item() * args.accumulation_steps loss_total = loss.item() * args.accumulation_steps
if epoch > 1 or best_loss > loss_total and accelerator.is_main_process: if epoch > 1 and best_loss > loss_total and accelerator.is_main_process:
best_loss = loss_total best_loss = loss_total
# 使用函数开始处定义的moe_path变量 # 使用函数开始处定义的moe_path变量
ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth' ckp = f'{args.save_dir}/pretrain_{args.dim}{moe_path}.pth'
@ -433,9 +537,34 @@ def train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, a
except Exception as e: except Exception as e:
Logger(f"Error in training step: {e}", accelerator) Logger(f"Error in training step: {e}", accelerator)
# 记录异常时的内存状态
if args.memory_monitor:
log_memory_status(step, prefetch_batches, accelerator, "at_exception", detailed=True)
import traceback import traceback
Logger(traceback.format_exc(), accelerator) Logger(traceback.format_exc(), accelerator)
# 清理prefetch_batches防止内存泄漏
if args.memory_monitor and accelerator.is_main_process:
Logger(f"[Memory Monitor] Clearing prefetch_batches due to exception. Current length: {len(prefetch_batches)}", accelerator)
prefetch_batches.clear()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
if args.memory_monitor:
log_memory_status(step, prefetch_batches, accelerator, "after_exception_cleanup", detailed=True)
# 训练epoch结束时清理prefetch_batches
if args.memory_monitor:
if accelerator.is_main_process:
Logger(f"[Memory Monitor] Epoch {epoch+1} finished. Clearing prefetch_batches. Final length: {len(prefetch_batches)}", accelerator)
log_memory_status(total_steps_in_epoch-1, prefetch_batches, accelerator, "before_epoch_end_cleanup", detailed=True)
prefetch_batches.clear()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
if args.memory_monitor:
log_memory_status(total_steps_in_epoch-1, prefetch_batches, accelerator, "after_epoch_end_cleanup", detailed=True)
def main(): def main():
parser = argparse.ArgumentParser(description="MiniMind Pretraining with Accelerate") parser = argparse.ArgumentParser(description="MiniMind Pretraining with Accelerate")
parser.add_argument("--out_dir", type=str, default="out") parser.add_argument("--out_dir", type=str, default="out")
@ -468,6 +597,8 @@ def main():
parser.add_argument("--fast_clustering", action="store_true", default=True, help="使用快速近似聚类算法(适用于大数据集)") parser.add_argument("--fast_clustering", action="store_true", default=True, help="使用快速近似聚类算法(适用于大数据集)")
parser.add_argument("--cluster_cache_path", type=str, default="./cache/cluster_tokens_single.pt", help="聚类结果缓存文件路径") parser.add_argument("--cluster_cache_path", type=str, default="./cache/cluster_tokens_single.pt", help="聚类结果缓存文件路径")
parser.add_argument("--recompute_clusters", action="store_true", default=False, help="强制重新计算聚类,忽略缓存文件") parser.add_argument("--recompute_clusters", action="store_true", default=False, help="强制重新计算聚类,忽略缓存文件")
parser.add_argument("--memory_monitor", action="store_true", default=False, help="启用内存监控")
parser.add_argument("--memory_monitor_interval", type=int, default=10, help="内存监控间隔(步数)")
args = parser.parse_args() args = parser.parse_args()
######################################################### #########################################################
@ -629,8 +760,26 @@ def main():
######################################################### #########################################################
overall_start_time = time.time() # Record overall start time overall_start_time = time.time() # Record overall start time
for epoch in range(args.epochs): for epoch in range(args.epochs):
Logger(f"开始第{epoch+1}轮训练", accelerator)
train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time, swanlab_run) # Pass overall start time train_epoch(epoch, accelerator, model, train_loader, optimizer, scheduler, args, ctx, overall_start_time, swanlab_run) # Pass overall start time
# 每个epoch结束后进行内存清理
Logger(f"{epoch+1}轮训练完成,进行内存清理", accelerator)
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 记录epoch结束时的内存状态
if accelerator.is_main_process:
memory_info = get_memory_usage()
cuda_info = get_cuda_memory_usage()
log_msg = f"[Memory Monitor] Epoch {epoch+1} completed - "
log_msg += f"System RSS: {memory_info['rss_mb']:.2f}MB"
if cuda_info:
log_msg += f", CUDA allocated: {cuda_info['cuda_allocated_mb']:.2f}MB"
log_msg += f", CUDA reserved: {cuda_info['cuda_reserved_mb']:.2f}MB"
Logger(log_msg, accelerator)
######################################################### #########################################################
# 关闭SwanLab # 关闭SwanLab
######################################################### #########################################################