#!/usr/bin/env python3 """ 分析训练与推理Loss差距的实验脚本 系统性地验证各种可能的原因 """ import json import random import torch import torch.nn.functional as F from transformers import AutoTokenizer import os from model.LMConfig import LMConfig from model.model_original import MiniMindLM def create_eval_data_from_training_data(): """ 从训练数据中重新提取样本创建eval_data.json 确保数据来源一致性 """ print("=== 1. 创建来自训练数据的评估集 ===") train_data_path = "/home/pci/ycz/Code/Minimind/dataset/stable/merged_pretrain.jsonl" eval_data_path = "dataset/stable/eval_data_from_train.json" # 确保目录存在 os.makedirs("dataset/stable", exist_ok=True) # 从训练数据中随机选择20条 samples = [] with open(train_data_path, 'r', encoding='utf-8') as f: all_lines = f.readlines() # 随机选择20条数据 selected_lines = random.sample(all_lines, min(20, len(all_lines))) for line in selected_lines: try: data = json.loads(line.strip()) samples.append(data) except json.JSONDecodeError: continue # 保存到新的评估文件 with open(eval_data_path, 'w', encoding='utf-8') as f: for sample in samples: f.write(json.dumps(sample, ensure_ascii=False) + '\n') print(f"✅ 创建了包含{len(samples)}个样本的评估数据集") print(f" 保存路径: {eval_data_path}") return eval_data_path, samples def load_model_and_tokenizer(model_path, device='cuda'): """ 加载模型和tokenizer,确保与训练时配置一致 """ print("=== 2. 加载模型和tokenizer ===") # 使用与训练时完全相同的配置 config = LMConfig( dim=512, n_layers=8, n_heads=32, vocab_size=6400, max_seq_len=512, dropout=0.0, norm_eps=1e-5, rope_theta=1e6, use_moe=False ) model = MiniMindLM(config) tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer') # 加载权重 if os.path.exists(model_path): print(f"正在加载权重: {model_path}") state_dict = torch.load(model_path, map_location=device) # 检查权重匹配情况 model_keys = set(model.state_dict().keys()) checkpoint_keys = set(state_dict.keys()) matched_keys = model_keys & checkpoint_keys missing_keys = model_keys - checkpoint_keys unexpected_keys = checkpoint_keys - model_keys print(f" 模型参数: {len(model_keys)}") print(f" 权重文件参数: {len(checkpoint_keys)}") print(f" 匹配参数: {len(matched_keys)}") print(f" 缺失参数: {len(missing_keys)}") print(f" 多余参数: {len(unexpected_keys)}") if missing_keys: print(f" ❌ 缺失参数: {list(missing_keys)[:5]}...") if unexpected_keys: print(f" ⚠️ 多余参数: {list(unexpected_keys)[:5]}...") model.load_state_dict(state_dict, strict=False) model.to(device) model.eval() print("✅ 模型加载完成") else: raise FileNotFoundError(f"模型文件不存在: {model_path}") return model, tokenizer, config def test_inference_modes(model, tokenizer, samples, device='cuda'): """ 测试不同推理模式的loss差异 """ print("=== 3. 测试不同推理模式 ===") results = {} for mode_name, use_cache in [("无缓存", False), ("有KV缓存", True)]: print(f"\n--- 测试模式: {mode_name} ---") total_loss = 0 valid_samples = 0 for i, sample in enumerate(samples[:5]): # 测试前5个样本 text = sample['text'] # 确保文本长度足够 tokens = tokenizer.encode(text, add_special_tokens=False) if len(tokens) < 130: # 100输入 + 30预测 continue input_tokens = tokens[:100] target_tokens = tokens[100:130] # 30个预测token input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device) target_ids = torch.tensor([target_tokens], dtype=torch.long).to(device) with torch.no_grad(): # 方法1: 直接forward计算loss(类似训练) full_input = torch.tensor([tokens[:130]], dtype=torch.long).to(device) outputs = model(full_input) logits = outputs.logits # 计算loss shift_logits = logits[0, 99:129, :].contiguous() # 取预测部分的logits shift_labels = target_ids[0].contiguous() loss = F.cross_entropy(shift_logits, shift_labels, reduction='mean') total_loss += loss.item() valid_samples += 1 print(f" 样本{i+1}: loss = {loss.item():.4f}") avg_loss = total_loss / valid_samples if valid_samples > 0 else 0 results[mode_name] = avg_loss print(f" {mode_name}平均loss: {avg_loss:.4f}") return results def test_autoregressive_vs_teacher_forcing(model, tokenizer, samples, device='cuda'): """ 对比自回归生成vs教师强制的loss差异 """ print("=== 4. 对比自回归生成 vs 教师强制 ===") results = {} for i, sample in enumerate(samples[:3]): # 测试前3个样本 text = sample['text'] tokens = tokenizer.encode(text, add_special_tokens=False) if len(tokens) < 130: continue input_tokens = tokens[:100] target_tokens = tokens[100:130] print(f"\n--- 样本 {i+1} ---") # 方法1: 教师强制(类似训练时) with torch.no_grad(): full_input = torch.tensor([tokens[:130]], dtype=torch.long).to(device) outputs = model(full_input) logits = outputs.logits shift_logits = logits[0, 99:129, :].contiguous() shift_labels = torch.tensor(target_tokens, dtype=torch.long).to(device) teacher_forcing_loss = F.cross_entropy(shift_logits, shift_labels, reduction='mean') print(f" 教师强制loss: {teacher_forcing_loss.item():.4f}") # 方法2: 自回归生成(逐步预测) with torch.no_grad(): current_sequence = torch.tensor([input_tokens], dtype=torch.long).to(device) autoregressive_losses = [] for step in range(len(target_tokens)): outputs = model(current_sequence) logits = outputs.logits[0, -1, :] # 只取最后一个位置的logits # 计算当前步骤的loss true_next_token = target_tokens[step] step_loss = F.cross_entropy(logits.unsqueeze(0), torch.tensor([true_next_token], device=device)) autoregressive_losses.append(step_loss.item()) # 添加真实token到序列中(教师强制) current_sequence = torch.cat([ current_sequence, torch.tensor([[true_next_token]], device=device) ], dim=1) autoregressive_loss = sum(autoregressive_losses) / len(autoregressive_losses) print(f" 自回归loss: {autoregressive_loss:.4f}") print(f" loss差距: {abs(autoregressive_loss - teacher_forcing_loss.item()):.4f}") # 方法3: 真实自回归生成(使用预测token) with torch.no_grad(): current_sequence = torch.tensor([input_tokens], dtype=torch.long).to(device) real_autoregressive_losses = [] for step in range(len(target_tokens)): outputs = model(current_sequence) logits = outputs.logits[0, -1, :] # 预测下一个token predicted_token = torch.argmax(logits, dim=-1).item() # 计算与真实token的loss true_next_token = target_tokens[step] step_loss = F.cross_entropy(logits.unsqueeze(0), torch.tensor([true_next_token], device=device)) real_autoregressive_losses.append(step_loss.item()) # 使用预测的token继续生成 current_sequence = torch.cat([ current_sequence, torch.tensor([[predicted_token]], device=device) ], dim=1) real_autoregressive_loss = sum(real_autoregressive_losses) / len(real_autoregressive_losses) print(f" 真实自回归loss: {real_autoregressive_loss:.4f}") def analyze_data_distribution(samples, tokenizer): """ 分析评估数据的分布特征 """ print("=== 5. 分析数据分布 ===") lengths = [] vocab_coverage = set() for sample in samples: text = sample['text'] tokens = tokenizer.encode(text, add_special_tokens=False) lengths.append(len(tokens)) vocab_coverage.update(tokens) print(f"文本长度统计:") print(f" 平均长度: {sum(lengths)/len(lengths):.1f} tokens") print(f" 最短: {min(lengths)} tokens") print(f" 最长: {max(lengths)} tokens") print(f" 词汇覆盖: {len(vocab_coverage)} 个不同token") print(f" 词汇覆盖率: {len(vocab_coverage)/6400*100:.1f}%") def compare_training_vs_inference_computation(model, tokenizer, samples, device='cuda'): """ 对比训练时和推理时的具体计算过程 """ print("=== 6. 对比训练与推理的计算过程 ===") sample = samples[0] text = sample['text'] tokens = tokenizer.encode(text, add_special_tokens=False) if len(tokens) < 130: print("样本长度不足,跳过") return input_tokens = tokens[:100] target_tokens = tokens[100:130] print(f"测试样本长度: {len(tokens)} tokens") print(f"输入部分: {len(input_tokens)} tokens") print(f"目标部分: {len(target_tokens)} tokens") # 模拟训练时的计算 print("\n--- 模拟训练时计算 ---") with torch.no_grad(): # 训练时:一次性输入完整序列 full_sequence = torch.tensor([tokens[:130]], dtype=torch.long).to(device) outputs = model(full_sequence) logits = outputs.logits print(f"输入形状: {full_sequence.shape}") print(f"输出logits形状: {logits.shape}") # 计算loss的方式和训练时一致 shift_logits = logits[0, :-1, :].contiguous() # 去掉最后一个position shift_labels = full_sequence[0, 1:].contiguous() # 去掉第一个position # 只计算预测部分的loss predict_start = 99 # 从第100个token开始预测 predict_logits = shift_logits[predict_start:predict_start+30, :] predict_labels = shift_labels[predict_start:predict_start+30] training_loss = F.cross_entropy(predict_logits, predict_labels, reduction='mean') print(f"训练方式loss: {training_loss.item():.4f}") # 模拟推理时的计算 print("\n--- 模拟推理时计算 ---") with torch.no_grad(): # 推理时:分别处理输入和目标 input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device) # 使用和eval_model.py相同的方法 full_input_for_loss = torch.tensor([tokens[:130]], dtype=torch.long).to(device) outputs = model(full_input_for_loss, logits_to_keep=30) if outputs.logits is not None: shift_logits = outputs.logits[0, -30:, :].contiguous() shift_labels = torch.tensor(target_tokens, dtype=torch.long).to(device) inference_loss = F.cross_entropy(shift_logits, shift_labels, reduction='mean') print(f"推理方式loss: {inference_loss.item():.4f}") else: print("无法获取logits") def main(): """ 主函数:系统性分析训练与推理loss差距 """ print("🔍 开始分析训练与推理Loss差距") print("="*60) # 设置随机种子确保结果可重现 random.seed(42) torch.manual_seed(42) device = 'cuda' if torch.cuda.is_available() else 'cpu' model_path = 'out/experiment_1_4_0/pretrain_512.pth' try: # 1. 创建来自训练数据的评估集 eval_data_path, samples = create_eval_data_from_training_data() # 2. 加载模型 model, tokenizer, config = load_model_and_tokenizer(model_path, device) # 3. 分析数据分布 analyze_data_distribution(samples, tokenizer) # 4. 测试不同推理模式 mode_results = test_inference_modes(model, tokenizer, samples, device) # 5. 对比自回归vs教师强制 test_autoregressive_vs_teacher_forcing(model, tokenizer, samples, device) # 6. 对比训练与推理的具体计算过程 compare_training_vs_inference_computation(model, tokenizer, samples, device) print("\n" + "="*60) print("🎯 分析完成") except Exception as e: print(f"❌ 分析过程中出现错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()