minimind-预训练流程-实验复现与代码解析(感谢作者开源)

首先对作者报以最诚挚的敬意,介绍详细,覆盖范围广,完全开源,没有特别复杂的接口,对入门NLP者相当友好🫡


Minimind-Github项目地址

train_pretrain.py代码结构

万物之源,所有微调,强化学习之前必先经此阶段


此阶段旨在训练一个文本接龙模型,该模型的唯一功能就是做文本接龙,通过之前输入的文本推测下一个文本是啥。只有先能够做到这个功能才能做到后面的智能对话。

代码行数:198。解析时间:1 Day

import os
#
import sys
__package__ = "trainer"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

import argparse
import time
import math
import warnings
import torch
import torch.distributed as dist
from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from contextlib import nullcontext
from transformers import AutoTokenizer
from model.model_minimind import MiniMindConfig, MiniMindForCausalLM
from dataset.lm_dataset import PretrainDataset

warnings.filterwarnings('ignore')
#忽略警告信息,以防干扰输出

def Logger(content):
    if not ddp or dist.get_rank() == 0:
        print(content)


def get_lr(current_step, total_steps, lr):
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
#学习率逐减

def train_epoch(epoch, wandb):
    loss_fct = nn.CrossEntropyLoss(reduction='none')
    #交叉熵
    start_time = time.time()
    #时间计算,记录开始时间
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        X = X.to(args.device)
        Y = Y.to(args.device)
        #X,Y为文本token,分别告诉模型输入为什么输出位什么,形状为【batch_size,num_tokens】


        #test input data and output data
        #if dist.get_rank() == 0:
        # print("X_shape=",X.shape)
        # print("Y_shape=",Y.shape)
        # print("X=",X)
        # print("Y=",Y)
        # time.sleep(10)
        # print("X[0]:", X[0])
        # print("Y[0]:", Y[0])
        # # print(X[0]==Y[0])
        # # print(decoded_input==decoded_output)
        # # print("程序已经暂停了 10 秒!")
        # #results:[32,511] 
        # # # 假设 X 是你的输入数据,其中的每个 token ID 需要映射回文字
        # decoded_input = tokenizer.decode(X[0], skip_special_tokens=False)  # 将 X 中第一个样本的 tokens 解码为文本
        # # print()
        # # # 对于 Y 也是一样
        # decoded_output = tokenizer.decode(Y[0], skip_special_tokens=False)  # 将 Y 中第一个样本的 tokens 解码为文本

        # # 打印解码后的文本
        # print("Decoded Input:", decoded_input)
        # print()
        # print("Decoded Output:", decoded_output)
        # # 测得X文本与Y文本完全相同
        # time.sleep(10)

        loss_mask = loss_mask.to(args.device)
        # print(loss_mask)
        #loss_mask的作用是标记tokens里面的有效位,因为tokenizer把text转换成tokens时,要固定总长度
        #但是文本长度是变化的,那没到固定长度,只能用padding来填充,填充的这一部分就是无效的\
        #会被tokenizer标记,从而得出loss_mask
        lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
        #计算当前学习学习率,输入当前步数和总步数,得到学习率iter_per_epoch=len(train_loader)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        with ctx:
        #ctx:精度选择器,通过运行设备来执行不同精度的运算,CPU上使用单精度计算,因为CPU不支持半精度
        #自动混合精度计算,反向传播用单精度,前向传播和矩阵乘法用半精度,可以加速计算并减少显存使用
            res = model(X)
            # start_可视化训练过程中的结果
            # 假设 res.logits 是模型输出,形状为 [batch_size, sequence_length, vocab_size]
            # logits = res.logits  # shape: [32, 511, 6400]

            # # 1. 获取每个位置的最大概率的 token 的下标
            # predicted_ids = torch.argmax(logits, dim=-1)  # shape: [32, 511]

            # # 2. 使用 tokenizer 将这些下标转换为文本
            # decoded_output = tokenizer.decode(predicted_ids[0], skip_special_tokens=False)

            # # 打印结果
            # print("res_show:",decoded_output)
            # print("res_end")
            # time.sleep(10)  
            #可视化训练过程中输出的结果。
            # print("Y.view_shape",Y.view(-1).shape)
            #32*511
            print(loss_mask.shape)
            loss = loss_fct(
                res.logits.view(-1, res.logits.size(-1)),
                Y.view(-1)
            ).view(Y.size())
            #通过结果的预测值和标准答案Y求交叉熵。优化模型以减少损失
            loss = (loss * loss_mask).sum() / loss_mask.sum()
            loss += res.aux_loss
            #aux_loss只有在MOE架构才会用到,目的是为了使每个专家都能够参与
            # print("res.aux_loss",res.aux_loss)
            loss = loss / args.accumulation_steps
            #梯度累积,默认八步批次算一次梯度,处理显存不足问题
        scaler.scale(loss).backward()
        #通过损失记录
        #当步数达到要求数量,计算一次反向传播
        if (step + 1) % args.accumulation_steps == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)

            scaler.step(optimizer)
            scaler.update()

            optimizer.zero_grad(set_to_none=True)
        # 步数达到记录要求,记录一次
        if step % args.log_interval == 0:
            spend_time = time.time() - start_time
            Logger(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
                    epoch + 1,
                    args.epochs,
                    step,
                    iter_per_epoch,
                    loss.item() * args.accumulation_steps,
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
            #terminal 记录实验参数
            if (wandb is not None) and (not ddp or dist.get_rank() == 0):
                wandb.log({"loss": loss.item() * args.accumulation_steps,
                           "lr": optimizer.param_groups[-1]['lr'],
                           "epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})
            # wandb记录损失,学习率和花费时间
        if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
            #步数到达保存要求
            model.eval()
            moe_path = '_moe' if lm_config.use_moe else ''
            ckp = f'{args.save_dir}/pretrain_{lm_config.hidden_size}{moe_path}.pth'
            #记录路径和文件名
            if isinstance(model, torch.nn.parallel.DistributedDataParallel):
                #检查是否ddp
                state_dict = model.module.state_dict()
            else:
                state_dict = model.state_dict()

            state_dict = {k: v.half() for k, v in state_dict.items()}  # 半精度保存
            torch.save(state_dict, ckp)
            model.train()


def init_model(lm_config):
    # 初始化模型,包括计算模型和对应分词器
    tokenizer = AutoTokenizer.from_pretrained('../model/')
    model = MiniMindForCausalLM(lm_config).to(args.device)
    Logger(f'LLM可训练总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
    #输出记录模型参数总量
    return model, tokenizer


def init_distributed_mode():
    if not ddp: return
    # 判断是否ddp, 否的话直接返回
    global ddp_local_rank, DEVICE
    # 定义两个全部变量
    dist.init_process_group(backend="nccl")
    # 初始化分布式训练所需进程组,采用NVIDIA Collective Communications Library(NCCL)作为通讯后端
    ddp_rank = int(os.environ["RANK"])
    # 获取当前进程在分布式训练中的全局排名,标识在所有进程中的位置
    ddp_local_rank = int(os.environ["LOCAL_RANK"])
    # 前进程在本地节点中的排名,标识在当前机器上的哪个 GPU 上运行,通常用于单机多卡。
    ddp_world_size = int(os.environ["WORLD_SIZE"])
    # 获取总进程数比如,如果你使用 2 台机器,每台机器上有 4 张 GPU,那么 WORLD_SIZE 的值就是 8。
    DEVICE = f"cuda:{ddp_local_rank}"
    torch.cuda.set_device(DEVICE)


# torchrun --nproc_per_node 2 1-pretrain.py
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="MiniMind Pretraining")
    parser.add_argument("--out_dir", type=str, default="../out")
    #ckpt输出文件夹位置
    # 若要以最快速度实现zero则epochs设置为1轮;否则应当利用有限的数据训练2~6个epochs。
    parser.add_argument("--epochs", type=int, default=1)
    #总训练论数
    parser.add_argument("--batch_size", type=int, default=32)
    #输入批次数
    parser.add_argument("--learning_rate", type=float, default=5e-4)
    #学习率
    parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu")
    #制定运行设备
    parser.add_argument("--dtype", type=str, default="bfloat16")
    # 数据类型
    parser.add_argument("--use_wandb", action="store_true")
    # 是否wandb记录
    parser.add_argument("--wandb_project", type=str, default="MiniMind-Pretrain")
    # 如果wandb记录,设置记录项目名
    parser.add_argument("--num_workers", type=int, default=1)
    # 设置进程数量
    parser.add_argument("--ddp", action="store_true")
    # 是否多卡分布式
    parser.add_argument("--accumulation_steps", type=int, default=8)
    # 梯度累积步数
    parser.add_argument("--grad_clip", type=float, default=1.0)
    # 梯度裁剪
    parser.add_argument("--warmup_iters", type=int, default=0)
    # 学习率预热,决定学习率是0到指定值逐渐变大,还是直接从指定值开始直接训练
    parser.add_argument("--log_interval", type=int, default=100)
    # 记录间隔
    parser.add_argument("--save_interval", type=int, default=100)
    # 保存间隔
    parser.add_argument('--local_rank', type=int, default=-1)
    # ddp本地排名
    parser.add_argument('--hidden_size', default=512, type=int)
    # 隐藏层维度
    parser.add_argument('--num_hidden_layers', default=8, type=int)
    # 隐藏层层数
    parser.add_argument('--max_seq_len', default=512, type=int)
    # 最大文本长度
    parser.add_argument('--use_moe', default=False, type=bool)
    # 是否使用moe架构
    parser.add_argument("--data_path", type=str, default="../dataset/pretrain_hq.jsonl")
    # 输入数据路径
    args = parser.parse_args()

    lm_config = MiniMindConfig(hidden_size=args.hidden_size, num_hidden_layers=args.num_hidden_layers, use_moe=args.use_moe)
    # 记录config
    args.save_dir = os.path.join(args.out_dir)
    os.makedirs(args.save_dir, exist_ok=True)
    os.makedirs(args.out_dir, exist_ok=True)
    # 创建制定文件夹
    tokens_per_iter = args.batch_size * args.max_seq_len
    #每轮总tokens数量
    device_type = "cuda" if "cuda" in args.device else "cpu"

    args.wandb_run_name = f"MiniMind-Pretrain-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"

    ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()

    ddp = int(os.environ.get("RANK", -1)) != -1  # is this a ddp run?
    ddp_local_rank, DEVICE = 0, "cuda:0"

    base_seed = 1337
    torch.manual_seed(base_seed)
    torch.cuda.manual_seed(base_seed)

    if ddp:
        init_distributed_mode()
        args.device = torch.device(DEVICE)
        rank = dist.get_rank()
        torch.manual_seed(base_seed + rank)
        # 同时设置 CUDA 的随机种子
        torch.cuda.manual_seed(base_seed + rank)

    if args.use_wandb and (not ddp or ddp_local_rank == 0):
        import wandb

        wandb.init(project=args.wandb_project, name=args.wandb_run_name)
    else:
        wandb = None

    model, tokenizer = init_model(lm_config)
    train_ds = PretrainDataset(args.data_path, tokenizer, max_length=args.max_seq_len)
    train_sampler = DistributedSampler(train_ds) if ddp else None
    train_loader = DataLoader(
        train_ds,
        batch_size=args.batch_size,
        pin_memory=True,
        drop_last=False,
        shuffle=False,
        num_workers=args.num_workers,
        sampler=train_sampler
    )

    scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
    optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)

    if ddp:
        model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
        # 旋转位置编码参数不用共享在ddp中
        model = DistributedDataParallel(model, device_ids=[ddp_local_rank])

    iter_per_epoch = len(train_loader)
    for epoch in range(args.epochs):
        train_epoch(epoch, wandb)

预训练流程图


主训练流程图

无标签
评论区
头像