首先对作者报以最诚挚的敬意,介绍详细,覆盖范围广,完全开源,没有特别复杂的接口,对入门NLP者相当友好🫡
TokenSkip-Github项目地址
train_pretrain.py代码结构
预训练完成的模型只会词语接龙,只能根据前面的文本预测下一个词出现的概率,通过微调才能够做到正确的和人对话,微调阶段会把混乱的数据集变成指令+角色+内容的形式,让模型能够知道哪些是人提出的输入的问题,哪些应该由模型输出,但是字符都是分词器中已经存在的字符,只是更改了排列形式。
微调和预训练的主要区别是数据集的变化,告诉模型如何进行对话。微调阶段可以不调整所有参数,只调整一部分,通过 lora(低秩矩阵乘积),和 fsdp(模型分卡存储)等技术可以进一步降低每张卡的显存需求。
代码行数:202。解析时间:X Day
import os
import sys
__package__ = "trainer"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import argparse
import time
import math
import warnings
import torch
import torch.distributed as dist
from contextlib import nullcontext
from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from transformers import AutoTokenizer, AutoModelForCausalLM
from model.model_minimind import MiniMindConfig, MiniMindForCausalLM
from dataset.lm_dataset import SFTDataset
warnings.filterwarnings('ignore')
def Logger(content):
if not ddp or dist.get_rank() == 0:
print(content)
def get_lr(current_step, total_steps, lr):
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
#和预训练几乎一样
def train_epoch(epoch, wandb):
loss_fct = nn.CrossEntropyLoss(reduction='none')
start_time = time.time()
for step, (X, Y, loss_mask) in enumerate(train_loader):
X = X.to(args.device)
Y = Y.to(args.device)
loss_mask = loss_mask.to(args.device)
lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
with ctx:
res = model(X)
loss = loss_fct(
res.logits.view(-1, res.logits.size(-1)),
Y.view(-1)
).view(Y.size())
loss = (loss * loss_mask).sum() / loss_mask.sum()
loss += res.aux_loss
loss = loss / args.accumulation_steps
scaler.scale(loss).backward()
if (step + 1) % args.accumulation_steps == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
if step % args.log_interval == 0:
spend_time = time.time() - start_time
Logger(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
args.epochs,
step,
iter_per_epoch,
loss.item() * args.accumulation_steps,
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
if (wandb is not None) and (not ddp or dist.get_rank() == 0):
wandb.log({"loss": loss * args.accumulation_steps,
"lr": optimizer.param_groups[-1]['lr'],
"epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})
if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
model.eval()
moe_path = '_moe' if lm_config.use_moe else ''
ckp = f'{args.save_dir}/full_sft_{lm_config.hidden_size}{moe_path}.pth'
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
state_dict = model.module.state_dict()
else:
state_dict = model.state_dict()
state_dict = {k: v.half() for k, v in state_dict.items()} # 半精度保存
torch.save(state_dict, ckp)
model.train()
def init_model(lm_config):
tokenizer = AutoTokenizer.from_pretrained('../model')
# 初始化分词器
model = MiniMindForCausalLM(lm_config)
# 初始化模型
moe_path = '_moe' if lm_config.use_moe else ''
# 判断是否使用了 moe 架构,选择对应权重
ckp = f'{args.save_dir}/pretrain_{lm_config.hidden_size}{moe_path}.pth'
# 确定 ckpt 位置
state_dict = torch.load(ckp, map_location=args.device)
model.load_state_dict(state_dict, strict=False)
# 加载模型参数
Logger(f'LLM可训练总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
# 记录参数量
model = model.to(args.device)
# 将模型挪到对应设备
return model, tokenizer
def init_distributed_mode():
if not ddp: return
# 判断是否多卡并行
global ddp_local_rank, DEVICE
# 多卡时设置全局变量记录当前节点编号
dist.init_process_group(backend="nccl")
# 确定多卡间通讯方式
ddp_rank = int(os.environ["RANK"])
ddp_local_rank = int(os.environ["LOCAL_RANK"])
# 确定多卡时当前节点编号
ddp_world_size = int(os.environ["WORLD_SIZE"])
DEVICE = f"cuda:{ddp_local_rank}"
# 确定 GPU 硬件设备编号
torch.cuda.set_device(DEVICE)
# 设置好装载数据的硬件
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MiniMind Full SFT")
# 全连接微调
parser.add_argument("--out_dir", type=str, default="../out")
# 输出文件夹
parser.add_argument("--epochs", type=int, default=2)
# 训练轮次
parser.add_argument("--batch_size", type=int, default=16)
# 每次输入的批次大小
parser.add_argument("--learning_rate", type=float, default=5e-7)
# 学习率
parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu")
# 设备
parser.add_argument("--dtype", type=str, default="bfloat16")
# 数据类型
parser.add_argument("--use_wandb", action="store_true")
# 是否记录
parser.add_argument("--wandb_project", type=str, default="MiniMind-Full-SFT")
# 若记录,设置项目名称
parser.add_argument("--num_workers", type=int, default=1)
# worker 个数
parser.add_argument("--ddp", action="store_true")
# 是否多卡
parser.add_argument("--accumulation_steps", type=int, default=1)
# 梯度累计步数
parser.add_argument("--grad_clip", type=float, default=1.0)
# 梯度裁切
parser.add_argument("--warmup_iters", type=int, default=0)
# 学习率是否预热
parser.add_argument("--log_interval", type=int, default=100)
# 记录间隔
parser.add_argument("--save_interval", type=int, default=100)
# 保存间隔
parser.add_argument('--local_rank', type=int, default=-1)
# 多卡训练时,当前节点标记
parser.add_argument('--hidden_size', default=512, type=int)
# 潜码维度
parser.add_argument('--num_hidden_layers', default=8, type=int)
# transformer 层数
parser.add_argument('--max_seq_len', default=512, type=int)
# 最大 token 长度
parser.add_argument('--use_moe', default=False, type=bool)
# 是否使用 moe
parser.add_argument("--data_path", type=str, default="../dataset/sft_mini_512.jsonl")
# 输入数据集
args = parser.parse_args()
# 保存超参数
lm_config = MiniMindConfig(hidden_size=args.hidden_size, num_hidden_layers=args.num_hidden_layers,
use_moe=args.use_moe)
# 记录 config
args.save_dir = os.path.join(args.out_dir)
# 设置保存文件夹
os.makedirs(args.save_dir, exist_ok=True)
os.makedirs(args.out_dir, exist_ok=True)
# 创建保存和输出文件夹,
tokens_per_iter = args.batch_size * args.max_seq_len
# 计算总iter
device_type = "cuda" if "cuda" in args.device else "cpu"
# 设置设备类别
args.wandb_run_name = f"MiniMind-Full-SFT-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"
# 项目记录名
# 后续代码释意皆和预训练流程相同
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()
ddp = int(os.environ.get("RANK", -1)) != -1 # is this a ddp run?
ddp_local_rank, DEVICE = 0, "cuda:0"
base_seed = 1337
torch.manual_seed(base_seed)
torch.cuda.manual_seed(base_seed)
if ddp:
init_distributed_mode()
args.device = torch.device(DEVICE)
rank = dist.get_rank()
torch.manual_seed(base_seed + rank)
# 同时设置 CUDA 的随机种子
torch.cuda.manual_seed(base_seed + rank)
if args.use_wandb and (not ddp or ddp_local_rank == 0):
import wandb
wandb.init(project=args.wandb_project, name=args.wandb_run_name)
else:
wandb = None
model, tokenizer = init_model(lm_config)
train_ds = SFTDataset(args.data_path, tokenizer, max_length=args.max_seq_len)
train_sampler = DistributedSampler(train_ds) if ddp else None
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
pin_memory=True,
drop_last=False,
shuffle=False,
num_workers=args.num_workers,
sampler=train_sampler
)
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)
if ddp:
model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
model = DistributedDataParallel(model, device_ids=[ddp_local_rank])
iter_per_epoch = len(train_loader)
for epoch in range(args.epochs):
train_epoch(epoch, wandb)