Minimind/analyze_train_inference_gap.py
2025-08-01 15:54:21 +08:00

371 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
分析训练与推理Loss差距的实验脚本
系统性地验证各种可能的原因
"""
import json
import random
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer
import os
from model.LMConfig import LMConfig
from model.model_original import MiniMindLM
def create_eval_data_from_training_data():
"""
从训练数据中重新提取样本创建eval_data.json
确保数据来源一致性
"""
print("=== 1. 创建来自训练数据的评估集 ===")
train_data_path = "/home/pci/ycz/Code/Minimind/dataset/stable/merged_pretrain.jsonl"
eval_data_path = "dataset/stable/eval_data_from_train.json"
# 确保目录存在
os.makedirs("dataset/stable", exist_ok=True)
# 从训练数据中随机选择20条
samples = []
with open(train_data_path, 'r', encoding='utf-8') as f:
all_lines = f.readlines()
# 随机选择20条数据
selected_lines = random.sample(all_lines, min(20, len(all_lines)))
for line in selected_lines:
try:
data = json.loads(line.strip())
samples.append(data)
except json.JSONDecodeError:
continue
# 保存到新的评估文件
with open(eval_data_path, 'w', encoding='utf-8') as f:
for sample in samples:
f.write(json.dumps(sample, ensure_ascii=False) + '\n')
print(f"✅ 创建了包含{len(samples)}个样本的评估数据集")
print(f" 保存路径: {eval_data_path}")
return eval_data_path, samples
def load_model_and_tokenizer(model_path, device='cuda'):
"""
加载模型和tokenizer确保与训练时配置一致
"""
print("=== 2. 加载模型和tokenizer ===")
# 使用与训练时完全相同的配置
config = LMConfig(
dim=512,
n_layers=8,
n_heads=32,
vocab_size=6400,
max_seq_len=512,
dropout=0.0,
norm_eps=1e-5,
rope_theta=1e6,
use_moe=False
)
model = MiniMindLM(config)
tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer')
# 加载权重
if os.path.exists(model_path):
print(f"正在加载权重: {model_path}")
state_dict = torch.load(model_path, map_location=device)
# 检查权重匹配情况
model_keys = set(model.state_dict().keys())
checkpoint_keys = set(state_dict.keys())
matched_keys = model_keys & checkpoint_keys
missing_keys = model_keys - checkpoint_keys
unexpected_keys = checkpoint_keys - model_keys
print(f" 模型参数: {len(model_keys)}")
print(f" 权重文件参数: {len(checkpoint_keys)}")
print(f" 匹配参数: {len(matched_keys)}")
print(f" 缺失参数: {len(missing_keys)}")
print(f" 多余参数: {len(unexpected_keys)}")
if missing_keys:
print(f" ❌ 缺失参数: {list(missing_keys)[:5]}...")
if unexpected_keys:
print(f" ⚠️ 多余参数: {list(unexpected_keys)[:5]}...")
model.load_state_dict(state_dict, strict=False)
model.to(device)
model.eval()
print("✅ 模型加载完成")
else:
raise FileNotFoundError(f"模型文件不存在: {model_path}")
return model, tokenizer, config
def test_inference_modes(model, tokenizer, samples, device='cuda'):
"""
测试不同推理模式的loss差异
"""
print("=== 3. 测试不同推理模式 ===")
results = {}
for mode_name, use_cache in [("无缓存", False), ("有KV缓存", True)]:
print(f"\n--- 测试模式: {mode_name} ---")
total_loss = 0
valid_samples = 0
for i, sample in enumerate(samples[:5]): # 测试前5个样本
text = sample['text']
# 确保文本长度足够
tokens = tokenizer.encode(text, add_special_tokens=False)
if len(tokens) < 130: # 100输入 + 30预测
continue
input_tokens = tokens[:100]
target_tokens = tokens[100:130] # 30个预测token
input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device)
target_ids = torch.tensor([target_tokens], dtype=torch.long).to(device)
with torch.no_grad():
# 方法1: 直接forward计算loss类似训练
full_input = torch.tensor([tokens[:130]], dtype=torch.long).to(device)
outputs = model(full_input)
logits = outputs.logits
# 计算loss
shift_logits = logits[0, 99:129, :].contiguous() # 取预测部分的logits
shift_labels = target_ids[0].contiguous()
loss = F.cross_entropy(shift_logits, shift_labels, reduction='mean')
total_loss += loss.item()
valid_samples += 1
print(f" 样本{i+1}: loss = {loss.item():.4f}")
avg_loss = total_loss / valid_samples if valid_samples > 0 else 0
results[mode_name] = avg_loss
print(f" {mode_name}平均loss: {avg_loss:.4f}")
return results
def test_autoregressive_vs_teacher_forcing(model, tokenizer, samples, device='cuda'):
"""
对比自回归生成vs教师强制的loss差异
"""
print("=== 4. 对比自回归生成 vs 教师强制 ===")
results = {}
for i, sample in enumerate(samples[:3]): # 测试前3个样本
text = sample['text']
tokens = tokenizer.encode(text, add_special_tokens=False)
if len(tokens) < 130:
continue
input_tokens = tokens[:100]
target_tokens = tokens[100:130]
print(f"\n--- 样本 {i+1} ---")
# 方法1: 教师强制(类似训练时)
with torch.no_grad():
full_input = torch.tensor([tokens[:130]], dtype=torch.long).to(device)
outputs = model(full_input)
logits = outputs.logits
shift_logits = logits[0, 99:129, :].contiguous()
shift_labels = torch.tensor(target_tokens, dtype=torch.long).to(device)
teacher_forcing_loss = F.cross_entropy(shift_logits, shift_labels, reduction='mean')
print(f" 教师强制loss: {teacher_forcing_loss.item():.4f}")
# 方法2: 自回归生成(逐步预测)
with torch.no_grad():
current_sequence = torch.tensor([input_tokens], dtype=torch.long).to(device)
autoregressive_losses = []
for step in range(len(target_tokens)):
outputs = model(current_sequence)
logits = outputs.logits[0, -1, :] # 只取最后一个位置的logits
# 计算当前步骤的loss
true_next_token = target_tokens[step]
step_loss = F.cross_entropy(logits.unsqueeze(0),
torch.tensor([true_next_token], device=device))
autoregressive_losses.append(step_loss.item())
# 添加真实token到序列中教师强制
current_sequence = torch.cat([
current_sequence,
torch.tensor([[true_next_token]], device=device)
], dim=1)
autoregressive_loss = sum(autoregressive_losses) / len(autoregressive_losses)
print(f" 自回归loss: {autoregressive_loss:.4f}")
print(f" loss差距: {abs(autoregressive_loss - teacher_forcing_loss.item()):.4f}")
# 方法3: 真实自回归生成使用预测token
with torch.no_grad():
current_sequence = torch.tensor([input_tokens], dtype=torch.long).to(device)
real_autoregressive_losses = []
for step in range(len(target_tokens)):
outputs = model(current_sequence)
logits = outputs.logits[0, -1, :]
# 预测下一个token
predicted_token = torch.argmax(logits, dim=-1).item()
# 计算与真实token的loss
true_next_token = target_tokens[step]
step_loss = F.cross_entropy(logits.unsqueeze(0),
torch.tensor([true_next_token], device=device))
real_autoregressive_losses.append(step_loss.item())
# 使用预测的token继续生成
current_sequence = torch.cat([
current_sequence,
torch.tensor([[predicted_token]], device=device)
], dim=1)
real_autoregressive_loss = sum(real_autoregressive_losses) / len(real_autoregressive_losses)
print(f" 真实自回归loss: {real_autoregressive_loss:.4f}")
def analyze_data_distribution(samples, tokenizer):
"""
分析评估数据的分布特征
"""
print("=== 5. 分析数据分布 ===")
lengths = []
vocab_coverage = set()
for sample in samples:
text = sample['text']
tokens = tokenizer.encode(text, add_special_tokens=False)
lengths.append(len(tokens))
vocab_coverage.update(tokens)
print(f"文本长度统计:")
print(f" 平均长度: {sum(lengths)/len(lengths):.1f} tokens")
print(f" 最短: {min(lengths)} tokens")
print(f" 最长: {max(lengths)} tokens")
print(f" 词汇覆盖: {len(vocab_coverage)} 个不同token")
print(f" 词汇覆盖率: {len(vocab_coverage)/6400*100:.1f}%")
def compare_training_vs_inference_computation(model, tokenizer, samples, device='cuda'):
"""
对比训练时和推理时的具体计算过程
"""
print("=== 6. 对比训练与推理的计算过程 ===")
sample = samples[0]
text = sample['text']
tokens = tokenizer.encode(text, add_special_tokens=False)
if len(tokens) < 130:
print("样本长度不足,跳过")
return
input_tokens = tokens[:100]
target_tokens = tokens[100:130]
print(f"测试样本长度: {len(tokens)} tokens")
print(f"输入部分: {len(input_tokens)} tokens")
print(f"目标部分: {len(target_tokens)} tokens")
# 模拟训练时的计算
print("\n--- 模拟训练时计算 ---")
with torch.no_grad():
# 训练时:一次性输入完整序列
full_sequence = torch.tensor([tokens[:130]], dtype=torch.long).to(device)
outputs = model(full_sequence)
logits = outputs.logits
print(f"输入形状: {full_sequence.shape}")
print(f"输出logits形状: {logits.shape}")
# 计算loss的方式和训练时一致
shift_logits = logits[0, :-1, :].contiguous() # 去掉最后一个position
shift_labels = full_sequence[0, 1:].contiguous() # 去掉第一个position
# 只计算预测部分的loss
predict_start = 99 # 从第100个token开始预测
predict_logits = shift_logits[predict_start:predict_start+30, :]
predict_labels = shift_labels[predict_start:predict_start+30]
training_loss = F.cross_entropy(predict_logits, predict_labels, reduction='mean')
print(f"训练方式loss: {training_loss.item():.4f}")
# 模拟推理时的计算
print("\n--- 模拟推理时计算 ---")
with torch.no_grad():
# 推理时:分别处理输入和目标
input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device)
# 使用和eval_model.py相同的方法
full_input_for_loss = torch.tensor([tokens[:130]], dtype=torch.long).to(device)
outputs = model(full_input_for_loss, logits_to_keep=30)
if outputs.logits is not None:
shift_logits = outputs.logits[0, -30:, :].contiguous()
shift_labels = torch.tensor(target_tokens, dtype=torch.long).to(device)
inference_loss = F.cross_entropy(shift_logits, shift_labels, reduction='mean')
print(f"推理方式loss: {inference_loss.item():.4f}")
else:
print("无法获取logits")
def main():
"""
主函数系统性分析训练与推理loss差距
"""
print("🔍 开始分析训练与推理Loss差距")
print("="*60)
# 设置随机种子确保结果可重现
random.seed(42)
torch.manual_seed(42)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model_path = 'out/experiment_1_4_0/pretrain_512.pth'
try:
# 1. 创建来自训练数据的评估集
eval_data_path, samples = create_eval_data_from_training_data()
# 2. 加载模型
model, tokenizer, config = load_model_and_tokenizer(model_path, device)
# 3. 分析数据分布
analyze_data_distribution(samples, tokenizer)
# 4. 测试不同推理模式
mode_results = test_inference_modes(model, tokenizer, samples, device)
# 5. 对比自回归vs教师强制
test_autoregressive_vs_teacher_forcing(model, tokenizer, samples, device)
# 6. 对比训练与推理的具体计算过程
compare_training_vs_inference_computation(model, tokenizer, samples, device)
print("\n" + "="*60)
print("🎯 分析完成")
except Exception as e:
print(f"❌ 分析过程中出现错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()