class PretrainDataset(Dataset):
def __init__(self, data_path, tokenizer, max_length=512):
super().__init__()
self.tokenizer = tokenizer
#存储分词器
self.max_length = max_length
#最大文本长度
self.samples = self.load_data(data_path)
#获得数据初始样本
def load_data(self, path):
samples = []
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
data = json.loads(line.strip())
# print(type(data))
# example(dict):{'text': '<|im_start|>abc。def。<|im_end|> <|im_start|>esdw。rwqe。<|im_end|>'}
# time.sleep(10)
samples.append(data)
# example(list):[{'text': '<|im_start|>abc。def。<|im_end|> <|im_start|>esdw。rwqe。<|im_end|>'},
# {'text': '<|im_start|>fadsf。wqe。<|im_end|> <|im_start|>ertw。tret。<|im_end|>'}]
# print(samples)
# time.sleep(10)
return samples
def __len__(self):
return len(self.samples)
def __getitem__(self, index):
sample = self.samples[index]
# example(dict):{'text': '<|im_start|>abc。def。<|im_end|> <|im_start|>esdw。rwqe。<|im_end|>'}
# 构建输入文本
encoding = self.tokenizer(
str(sample['text']),
max_length=self.max_length,
padding='max_length',
truncation=True,
return_tensors='pt'
)
# example(str)
# sample['text']:'<|im_start|>abc。def。<|im_end|> <|im_start|>esdw。rwqe。<|im_end|>'
input_ids = encoding.input_ids.squeeze()
# example(list):
# input_ids: tensor([ 1, 2177, ..., 1055, 2, 223, 1, 1083, ...,
# 2, 223, 1, 2104, ..., 2, 0, 0, ..., 0])
# print(type(self.tokenizer.pad_token_id))
# example(int):
# pad_token_id: 0
# print("input_ids:",input_ids)
# print("pad_token_id:",self.tokenizer.pad_token_id)
# time.sleep(10)
#
loss_mask = (input_ids != self.tokenizer.pad_token_id)
# 只标记非填充位置
X = torch.tensor(input_ids[:-1], dtype=torch.long)
# 去掉最后一位
Y = torch.tensor(input_ids[1:], dtype=torch.long)
# 去掉第一位
# print("X:",X)
# time.sleep(10)
# print("Y:",Y)
# time.sleep(10)
loss_mask = torch.tensor(loss_mask[1:], dtype=torch.long)
# 同Y
return X, Y, loss_maskclass SFTDataset(Dataset):
def __init__(self, jsonl_path, tokenizer, max_length=1024):
super().__init__()
self.tokenizer = tokenizer
# 存储分词器
self.max_length = max_length
# 最大文本长度
self.samples = self.load_data(jsonl_path)
# 获得数据初始样本
self.bos_id = tokenizer('<|im_start|>assistant', add_special_tokens=False).input_ids
# 获得开始标识
# example(list):[1, 1078, 538, 501]
# print(self.bos_id)
# time.sleep(10)
self.eos_id = tokenizer('<|im_end|>', add_special_tokens=False).input_ids
# 获得结束标识
# example(list):[2]
# print(self.eos_id)
# time.sleep(10)
def __len__(self):
return len(self.samples)
# 获取整体数据量
def load_data(self, path):
# 加载数据
samples = []
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
data = json.loads(line.strip())
# example(dict):
# {'conversations': [{'role': 'user', 'content': 'Requirements。'},
# {'role': 'assistant', 'content': 'feedback。'}]}
# print(data)
# time.sleep(10)
samples.append(data)
# examples(list):
# [{'conversations': [{'role': 'user', 'content': 'Requirements1。'},
# {'role': 'assistant', 'content': 'feedback1。'}]},
# {'conversations': [{'role': 'user', 'content': 'Requirements2。'},
# {'role': 'assistant', 'content': 'feedback2。'}]},
# ...
# ]
# print(samples)
# time.sleep(10)
return samples
def _create_chat_prompt(self, conversations):
"""构建符合ChatML格式的对话"""
messages = []
# example(list):
# conversations:[{'role': 'user', 'content': 'Requirements。'},
# {'role': 'assistant', 'content': 'feedback。'}]
# print(conversations)
# time.sleep(10)
for i, turn in enumerate(conversations):
role = 'user' if i % 2 == 0 else 'assistant'
messages.append({"role": role, "content": turn['content']})
# print(messages)
# time.sleep(10)
# example(list):
# messages:[{'role': 'user', 'content': 'Requirements。'},
# {'role': 'assistant', 'content': 'feedback。'}]
return self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=False
)
# 把类似 ChatGPT 的对话结构(role + content)转换成模型要求的 ChatML 格式文本(带 <|im_start|> / <|im_end|> 之类标记)。
# tokenize:是否将文本直接转换成 token
# add_generation_prompt:是否提示模型继续生成文本,如:<|im_start|>user你好!<|im_end|><|im_start|>assistant 和
# <|im_start|>user你好!<|im_end|> <|im_start|>assistant 你好,有什么我能帮你的吗?<|im_end|>的区别
def _generate_loss_mask(self, input_ids):
loss_mask = [0] * len(input_ids)
# 创建相同形状的 mask
i = 0
# 定初始下标
while i < len(input_ids):
# 循环到末尾为止
if input_ids[i:i + len(self.bos_id)] == self.bos_id:
# 检测到<|im_start|>assistant时
start = i + len(self.bos_id)
# 从<|im_start|>assistant后面说的话开始计算
end = start
# 启用 end 指针
while end < len(input_ids):
# 循环到末尾前
if input_ids[end:end + len(self.eos_id)] == self.eos_id:
# 找到第一个<|im_end|>之时
break
# 找到这句话的尾了,退出循环
end += 1
# 不停往后找
for j in range(start + 1, min(end + len(self.eos_id) + 1, self.max_length)):
# star+1 应该是为了跳过换行符,min(end + len(self.eos_id) + 1是为了让说完之后再加个换行符?
loss_mask[j] = 1
#
# segment = ''.join(self.tokenizer.decode([tid], skip_special_tokens=False)
# for tid in input_ids[start + 1:min(end + len(self.eos_id) + 1, self.max_length)])
# print(repr(segment))
# time.sleep(10)
# sample-star[star+1:min(end + len(self.eos_id) + 1]:
# '阿里巴巴集团的企业文化以“客户第一、员工第二、股东第三”为核心价值观,强调“让天下没有难做的生意”的使命。公司��导开放、透明、分享、责任的团队合作精神,鼓励员工创新、追求��越,同时注重员工的个人成长和幸福感。阿里巴巴的企业文化还体现在其独特的“����神��”价
# 值观体系中,包括客户第一、拥��变化、团队合作、诚信、激情、专业等��个方面,这些价值观不仅指导着公司的日常运营,也深深影响着每一位阿里人的行为准则。<|im_end|>\n'
i = end + len(self.eos_id) if end < len(input_ids) else len(input_ids)
# 跳到句末
else:
i += 1
# 没找到就继续找
return loss_mask
def __getitem__(self, index):
sample = self.samples[index]
# example(dict):
# {'conversations': [{'role': 'user', 'content': 'Requirements。'},
# {'role': 'assistant', 'content': 'feedback。'}]}
# 构建对话提示
prompt = self._create_chat_prompt(sample['conversations'])
# 进去一个列表,列表里有两个 dict,分别包含问和答的角色以及内容,将其改造成模型能识别的格式
# example(str):
# prompt:"<|im_start|>system\n
# You are a helpful assistant<|im_end|>
# <|im_start|>user
# Requirement。<|im_end|>
# <|im_start|>assistant
# FeedBack。<|im_end|>"
# print(repr(prompt))
# time.sleep(10)
input_ids = self.tokenizer(prompt).input_ids[:self.max_length]
# 长度过长就截断
input_ids += [self.tokenizer.pad_token_id] * (self.max_length - len(input_ids))
# 长度不够就填充
# 生成动态损失掩码
loss_mask = self._generate_loss_mask(input_ids)
# 只需要对 assistant的 feedback 求损失就行了,所以只码住 assistant 的 feedback
# 构建训练数据
X = torch.tensor(input_ids[:-1], dtype=torch.long)
Y = torch.tensor(input_ids[1:], dtype=torch.long)
loss_mask = torch.tensor(loss_mask[1:], dtype=torch.long) # 对齐预测位置
return X, Y, loss_mask
]]>{"text": "<|im_start|>鉴别一组中文文章的风格和特点,例如官方、口语、文言等。需要提供样例文章才能准确鉴别不同的风格和特点。<|im_end|> <|im_start|>好的,现在帮我查一下今天的天气怎么样?今天的天气依据地区而异。请问你需要我帮你查询哪个地区的天气呢?<|im_end|> <|im_start|>打开闹钟功能,定一个明天早上七点的闹钟。好的,我已经帮您打开闹钟功能,闹钟将在明天早上七点准时响起。<|im_end|> <|im_start|>为以下场景写一句话描述:一个孤独的老人坐在公园长椅上看着远处。一位孤独的老人坐在公园长椅上凝视远方。<|im_end|> <|im_start|>非常感谢你的回答。请告诉我,这些数据是关于什么主题的?这些数据是关于不同年龄段的男女人口比例分布的。<|im_end|> <|im_start|>帮我想一个有趣的标题。这个挺有趣的:\"如何成为一名成功的魔术师\" 调皮的标题往往会吸引读者的注意力。<|im_end|> <|im_start|>回答一个问题,地球的半径是多少?地球的平均半径约为6371公里,这是地球自赤道到两极的距离的平均值。<|im_end|> <|im_start|>识别文本中的语气,并将其分类为喜悦、悲伤、惊异等。\n文本:“今天是我的生日!”这个文本的语气是喜悦。<|im_end|>"}
{"text": "<|im_start|>根据输入的内容,编写一个类别标签。\n这是一篇介绍如何阅读心电图的文章类别标签: 医学/心电图阅读指南<|im_end|> <|im_start|>帮我搜索一下最近的天气情况。当然,我可以帮您搜索最新的天气情况。请问您需要查询哪个城市的天气情况呢?<|im_end|> <|im_start|>帮我讲一个令人开心的笑话。好的,我帮您讲一个关于细菌的笑话。为什么细菌不会上网?因为连接总是断开了!<|im_end|> <|im_start|>现在给我生成一首关于大海的五言诗。碧波万顷月满天,海天相接处天地间。波涛滚滚江山美,海鸟翱翔日月闲。<|im_end|> <|im_start|>谢谢你,这篇文章很有用。不客气,我很高兴能够为您提供帮助。如果您还有其他问题或需求,随时可以对我说。<|im_end|> <|im_start|>你好,我想下载一个视频编辑软件,你有什么推荐吗?您好!当然,有很多选择。您想要免费软件还是愿意付费?<|im_end|> <|im_start|>为什么我的程序不输出正确结果?可能是代码逻辑有误,或者输入数据有误,需要仔细调试代码逻辑和输入数据。<|im_end|> <|im_start|>谢谢你的回答。现在我想知道这场比赛的具体时间和地点。这场比赛的时间是北京时间10月4日,地点是上海。<|im_end|>"}
...{"conversations": [{"role": "user", "content": "请告诉我在中国古代的“四大发明”是什么?"}, {"role": "assistant", "content": "中国古代的“四大发明”是指造纸术、印刷术、火药和指南针。这四项发明对世界文明的发展产生了深远的影响:\n\n1. **造纸术**:据史书记载,东汉时期的蔡伦改进了造纸工艺,使得纸张的生产更加便捷、成本更低,质量也更加优良。这一发明极大地促进了文化的传播和保存。\n\n2. **印刷术**:中国古代的印刷术最早可以追溯到唐代的雕版印刷,到了宋代发展出了活字印刷技术。印刷术的发明极大地促进了知识的传播,降低了书籍的成本,对教育和文化的发展起到了重要的推动作用。\n\n3. **火药**:火药最初是在唐代被发现的,最初可能用于医疗或炼金术。到了宋代,人们开始将火药用于军事目的,发明了各种火器。火药的发明改变了战争的面貌,对世界军事技术的发展产生了重大影响。\n\n4. **指南针**:指南针最初在中国被用于风水测量,后来逐渐发展成为航海导航的重要工具。这一发明极大地促进了海上航行技术的发展,对于新航路的开辟和世界地理大发现起到了关键作用。\n\n这四项发明不仅在中国历史上占有重要地位,而且对全世界的科技进步和文明发展都产生了深远的影响。"}]}
{"conversations": [{"role": "user", "content": "请用一段话描述阿里巴巴集团的企业文化。"}, {"role": "assistant", "content": "阿里巴巴集团的企业文化以“客户第一、员工第二、股东第三”为核心价值观,强调“让天下没有难做的生意”的使命。公司倡导开放、透明、分享、责任的团队合作精神,鼓励员工创新、追求卓越,同时注重员工的个人成长和幸福感。阿里巴巴的企业文化还体现在其独特的“六脉神剑”价值观体系中,包括客户第一、拥抱变化、团队合作、诚信、激情、专业等六个方面,这些价值观不仅指导着公司的日常运营,也深深影响着每一位阿里人的行为准则。"}]}
...
]]>[quote color="warning"]预训练完成的模型只会词语接龙,只能根据前面的文本预测下一个词出现的概率,通过微调才能够做到正确的和人对话,微调阶段会把混乱的数据集变成指令+角色+内容的形式,让模型能够知道哪些是人提出的输入的问题,哪些应该由模型输出,但是字符都是分词器中已经存在的字符,只是更改了排列形式。[/quote]
微调和预训练的主要区别是数据集的变化,告诉模型如何进行对话。微调阶段可以不调整所有参数,只调整一部分,通过 lora(低秩矩阵乘积),和 fsdp(模型分卡存储)等技术可以进一步降低每张卡的显存需求。
[acc status="close" title="解析统计"]代码行数:202。解析时间:X Day[/acc]
import os
import sys
__package__ = "trainer"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import argparse
import time
import math
import warnings
import torch
import torch.distributed as dist
from contextlib import nullcontext
from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from transformers import AutoTokenizer, AutoModelForCausalLM
from model.model_minimind import MiniMindConfig, MiniMindForCausalLM
from dataset.lm_dataset import SFTDataset
warnings.filterwarnings('ignore')
def Logger(content):
if not ddp or dist.get_rank() == 0:
print(content)
def get_lr(current_step, total_steps, lr):
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
#和预训练几乎一样
def train_epoch(epoch, wandb):
loss_fct = nn.CrossEntropyLoss(reduction='none')
start_time = time.time()
for step, (X, Y, loss_mask) in enumerate(train_loader):
X = X.to(args.device)
Y = Y.to(args.device)
loss_mask = loss_mask.to(args.device)
lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
with ctx:
res = model(X)
loss = loss_fct(
res.logits.view(-1, res.logits.size(-1)),
Y.view(-1)
).view(Y.size())
loss = (loss * loss_mask).sum() / loss_mask.sum()
loss += res.aux_loss
loss = loss / args.accumulation_steps
scaler.scale(loss).backward()
if (step + 1) % args.accumulation_steps == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
if step % args.log_interval == 0:
spend_time = time.time() - start_time
Logger(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
args.epochs,
step,
iter_per_epoch,
loss.item() * args.accumulation_steps,
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
if (wandb is not None) and (not ddp or dist.get_rank() == 0):
wandb.log({"loss": loss * args.accumulation_steps,
"lr": optimizer.param_groups[-1]['lr'],
"epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})
if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
model.eval()
moe_path = '_moe' if lm_config.use_moe else ''
ckp = f'{args.save_dir}/full_sft_{lm_config.hidden_size}{moe_path}.pth'
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
state_dict = model.module.state_dict()
else:
state_dict = model.state_dict()
state_dict = {k: v.half() for k, v in state_dict.items()} # 半精度保存
torch.save(state_dict, ckp)
model.train()
def init_model(lm_config):
tokenizer = AutoTokenizer.from_pretrained('../model')
# 初始化分词器
model = MiniMindForCausalLM(lm_config)
# 初始化模型
moe_path = '_moe' if lm_config.use_moe else ''
# 判断是否使用了 moe 架构,选择对应权重
ckp = f'{args.save_dir}/pretrain_{lm_config.hidden_size}{moe_path}.pth'
# 确定 ckpt 位置
state_dict = torch.load(ckp, map_location=args.device)
model.load_state_dict(state_dict, strict=False)
# 加载模型参数
Logger(f'LLM可训练总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
# 记录参数量
model = model.to(args.device)
# 将模型挪到对应设备
return model, tokenizer
def init_distributed_mode():
if not ddp: return
# 判断是否多卡并行
global ddp_local_rank, DEVICE
# 多卡时设置全局变量记录当前节点编号
dist.init_process_group(backend="nccl")
# 确定多卡间通讯方式
ddp_rank = int(os.environ["RANK"])
ddp_local_rank = int(os.environ["LOCAL_RANK"])
# 确定多卡时当前节点编号
ddp_world_size = int(os.environ["WORLD_SIZE"])
DEVICE = f"cuda:{ddp_local_rank}"
# 确定 GPU 硬件设备编号
torch.cuda.set_device(DEVICE)
# 设置好装载数据的硬件
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MiniMind Full SFT")
# 全连接微调
parser.add_argument("--out_dir", type=str, default="../out")
# 输出文件夹
parser.add_argument("--epochs", type=int, default=2)
# 训练轮次
parser.add_argument("--batch_size", type=int, default=16)
# 每次输入的批次大小
parser.add_argument("--learning_rate", type=float, default=5e-7)
# 学习率
parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu")
# 设备
parser.add_argument("--dtype", type=str, default="bfloat16")
# 数据类型
parser.add_argument("--use_wandb", action="store_true")
# 是否记录
parser.add_argument("--wandb_project", type=str, default="MiniMind-Full-SFT")
# 若记录,设置项目名称
parser.add_argument("--num_workers", type=int, default=1)
# worker 个数
parser.add_argument("--ddp", action="store_true")
# 是否多卡
parser.add_argument("--accumulation_steps", type=int, default=1)
# 梯度累计步数
parser.add_argument("--grad_clip", type=float, default=1.0)
# 梯度裁切
parser.add_argument("--warmup_iters", type=int, default=0)
# 学习率是否预热
parser.add_argument("--log_interval", type=int, default=100)
# 记录间隔
parser.add_argument("--save_interval", type=int, default=100)
# 保存间隔
parser.add_argument('--local_rank', type=int, default=-1)
# 多卡训练时,当前节点标记
parser.add_argument('--hidden_size', default=512, type=int)
# 潜码维度
parser.add_argument('--num_hidden_layers', default=8, type=int)
# transformer 层数
parser.add_argument('--max_seq_len', default=512, type=int)
# 最大 token 长度
parser.add_argument('--use_moe', default=False, type=bool)
# 是否使用 moe
parser.add_argument("--data_path", type=str, default="../dataset/sft_mini_512.jsonl")
# 输入数据集
args = parser.parse_args()
# 保存超参数
lm_config = MiniMindConfig(hidden_size=args.hidden_size, num_hidden_layers=args.num_hidden_layers,
use_moe=args.use_moe)
# 记录 config
args.save_dir = os.path.join(args.out_dir)
# 设置保存文件夹
os.makedirs(args.save_dir, exist_ok=True)
os.makedirs(args.out_dir, exist_ok=True)
# 创建保存和输出文件夹,
tokens_per_iter = args.batch_size * args.max_seq_len
# 计算总iter
device_type = "cuda" if "cuda" in args.device else "cpu"
# 设置设备类别
args.wandb_run_name = f"MiniMind-Full-SFT-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"
# 项目记录名
# 后续代码释意皆和预训练流程相同
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()
ddp = int(os.environ.get("RANK", -1)) != -1 # is this a ddp run?
ddp_local_rank, DEVICE = 0, "cuda:0"
base_seed = 1337
torch.manual_seed(base_seed)
torch.cuda.manual_seed(base_seed)
if ddp:
init_distributed_mode()
args.device = torch.device(DEVICE)
rank = dist.get_rank()
torch.manual_seed(base_seed + rank)
# 同时设置 CUDA 的随机种子
torch.cuda.manual_seed(base_seed + rank)
if args.use_wandb and (not ddp or ddp_local_rank == 0):
import wandb
wandb.init(project=args.wandb_project, name=args.wandb_run_name)
else:
wandb = None
model, tokenizer = init_model(lm_config)
train_ds = SFTDataset(args.data_path, tokenizer, max_length=args.max_seq_len)
train_sampler = DistributedSampler(train_ds) if ddp else None
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
pin_memory=True,
drop_last=False,
shuffle=False,
num_workers=args.num_workers,
sampler=train_sampler
)
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)
if ddp:
model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
model = DistributedDataParallel(model, device_ids=[ddp_local_rank])
iter_per_epoch = len(train_loader)
for epoch in range(args.epochs):
train_epoch(epoch, wandb)
]]>[quote color="warning"]万物之源,所有微调,强化学习之前必先经此阶段[/quote]
此阶段旨在训练一个文本接龙模型,该模型的唯一功能就是做文本接龙,通过之前输入的文本推测下一个文本是啥。只有先能够做到这个功能才能做到后面的智能对话。
[acc status="close" title="解析统计"]代码行数:198。解析时间:1 Day[/acc]
import os
#
import sys
__package__ = "trainer"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import argparse
import time
import math
import warnings
import torch
import torch.distributed as dist
from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from contextlib import nullcontext
from transformers import AutoTokenizer
from model.model_minimind import MiniMindConfig, MiniMindForCausalLM
from dataset.lm_dataset import PretrainDataset
warnings.filterwarnings('ignore')
#忽略警告信息,以防干扰输出
def Logger(content):
if not ddp or dist.get_rank() == 0:
print(content)
def get_lr(current_step, total_steps, lr):
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
#学习率逐减
def train_epoch(epoch, wandb):
loss_fct = nn.CrossEntropyLoss(reduction='none')
#交叉熵
start_time = time.time()
#时间计算,记录开始时间
for step, (X, Y, loss_mask) in enumerate(train_loader):
X = X.to(args.device)
Y = Y.to(args.device)
#X,Y为文本token,分别告诉模型输入为什么输出位什么,形状为【batch_size,num_tokens】
#test input data and output data
#if dist.get_rank() == 0:
# print("X_shape=",X.shape)
# print("Y_shape=",Y.shape)
# print("X=",X)
# print("Y=",Y)
# time.sleep(10)
# print("X[0]:", X[0])
# print("Y[0]:", Y[0])
# # print(X[0]==Y[0])
# # print(decoded_input==decoded_output)
# # print("程序已经暂停了 10 秒!")
# #results:[32,511]
# # # 假设 X 是你的输入数据,其中的每个 token ID 需要映射回文字
# decoded_input = tokenizer.decode(X[0], skip_special_tokens=False) # 将 X 中第一个样本的 tokens 解码为文本
# # print()
# # # 对于 Y 也是一样
# decoded_output = tokenizer.decode(Y[0], skip_special_tokens=False) # 将 Y 中第一个样本的 tokens 解码为文本
# # 打印解码后的文本
# print("Decoded Input:", decoded_input)
# print()
# print("Decoded Output:", decoded_output)
# # 测得X文本与Y文本完全相同
# time.sleep(10)
loss_mask = loss_mask.to(args.device)
# print(loss_mask)
#loss_mask的作用是标记tokens里面的有效位,因为tokenizer把text转换成tokens时,要固定总长度
#但是文本长度是变化的,那没到固定长度,只能用padding来填充,填充的这一部分就是无效的\
#会被tokenizer标记,从而得出loss_mask
lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
#计算当前学习学习率,输入当前步数和总步数,得到学习率iter_per_epoch=len(train_loader)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
with ctx:
#ctx:精度选择器,通过运行设备来执行不同精度的运算,CPU上使用单精度计算,因为CPU不支持半精度
#自动混合精度计算,反向传播用单精度,前向传播和矩阵乘法用半精度,可以加速计算并减少显存使用
res = model(X)
# start_可视化训练过程中的结果
# 假设 res.logits 是模型输出,形状为 [batch_size, sequence_length, vocab_size]
# logits = res.logits # shape: [32, 511, 6400]
# # 1. 获取每个位置的最大概率的 token 的下标
# predicted_ids = torch.argmax(logits, dim=-1) # shape: [32, 511]
# # 2. 使用 tokenizer 将这些下标转换为文本
# decoded_output = tokenizer.decode(predicted_ids[0], skip_special_tokens=False)
# # 打印结果
# print("res_show:",decoded_output)
# print("res_end")
# time.sleep(10)
#可视化训练过程中输出的结果。
# print("Y.view_shape",Y.view(-1).shape)
#32*511
print(loss_mask.shape)
loss = loss_fct(
res.logits.view(-1, res.logits.size(-1)),
Y.view(-1)
).view(Y.size())
#通过结果的预测值和标准答案Y求交叉熵。优化模型以减少损失
loss = (loss * loss_mask).sum() / loss_mask.sum()
loss += res.aux_loss
#aux_loss只有在MOE架构才会用到,目的是为了使每个专家都能够参与
# print("res.aux_loss",res.aux_loss)
loss = loss / args.accumulation_steps
#梯度累积,默认八步批次算一次梯度,处理显存不足问题
scaler.scale(loss).backward()
#通过损失记录
#当步数达到要求数量,计算一次反向传播
if (step + 1) % args.accumulation_steps == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
# 步数达到记录要求,记录一次
if step % args.log_interval == 0:
spend_time = time.time() - start_time
Logger(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
args.epochs,
step,
iter_per_epoch,
loss.item() * args.accumulation_steps,
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
#terminal 记录实验参数
if (wandb is not None) and (not ddp or dist.get_rank() == 0):
wandb.log({"loss": loss.item() * args.accumulation_steps,
"lr": optimizer.param_groups[-1]['lr'],
"epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})
# wandb记录损失,学习率和花费时间
if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
#步数到达保存要求
model.eval()
moe_path = '_moe' if lm_config.use_moe else ''
ckp = f'{args.save_dir}/pretrain_{lm_config.hidden_size}{moe_path}.pth'
#记录路径和文件名
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
#检查是否ddp
state_dict = model.module.state_dict()
else:
state_dict = model.state_dict()
state_dict = {k: v.half() for k, v in state_dict.items()} # 半精度保存
torch.save(state_dict, ckp)
model.train()
def init_model(lm_config):
# 初始化模型,包括计算模型和对应分词器
tokenizer = AutoTokenizer.from_pretrained('../model/')
model = MiniMindForCausalLM(lm_config).to(args.device)
Logger(f'LLM可训练总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
#输出记录模型参数总量
return model, tokenizer
def init_distributed_mode():
if not ddp: return
# 判断是否ddp, 否的话直接返回
global ddp_local_rank, DEVICE
# 定义两个全部变量
dist.init_process_group(backend="nccl")
# 初始化分布式训练所需进程组,采用NVIDIA Collective Communications Library(NCCL)作为通讯后端
ddp_rank = int(os.environ["RANK"])
# 获取当前进程在分布式训练中的全局排名,标识在所有进程中的位置
ddp_local_rank = int(os.environ["LOCAL_RANK"])
# 前进程在本地节点中的排名,标识在当前机器上的哪个 GPU 上运行,通常用于单机多卡。
ddp_world_size = int(os.environ["WORLD_SIZE"])
# 获取总进程数比如,如果你使用 2 台机器,每台机器上有 4 张 GPU,那么 WORLD_SIZE 的值就是 8。
DEVICE = f"cuda:{ddp_local_rank}"
torch.cuda.set_device(DEVICE)
# torchrun --nproc_per_node 2 1-pretrain.py
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MiniMind Pretraining")
parser.add_argument("--out_dir", type=str, default="../out")
#ckpt输出文件夹位置
# 若要以最快速度实现zero则epochs设置为1轮;否则应当利用有限的数据训练2~6个epochs。
parser.add_argument("--epochs", type=int, default=1)
#总训练论数
parser.add_argument("--batch_size", type=int, default=32)
#输入批次数
parser.add_argument("--learning_rate", type=float, default=5e-4)
#学习率
parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu")
#制定运行设备
parser.add_argument("--dtype", type=str, default="bfloat16")
# 数据类型
parser.add_argument("--use_wandb", action="store_true")
# 是否wandb记录
parser.add_argument("--wandb_project", type=str, default="MiniMind-Pretrain")
# 如果wandb记录,设置记录项目名
parser.add_argument("--num_workers", type=int, default=1)
# 设置进程数量
parser.add_argument("--ddp", action="store_true")
# 是否多卡分布式
parser.add_argument("--accumulation_steps", type=int, default=8)
# 梯度累积步数
parser.add_argument("--grad_clip", type=float, default=1.0)
# 梯度裁剪
parser.add_argument("--warmup_iters", type=int, default=0)
# 学习率预热,决定学习率是0到指定值逐渐变大,还是直接从指定值开始直接训练
parser.add_argument("--log_interval", type=int, default=100)
# 记录间隔
parser.add_argument("--save_interval", type=int, default=100)
# 保存间隔
parser.add_argument('--local_rank', type=int, default=-1)
# ddp本地排名
parser.add_argument('--hidden_size', default=512, type=int)
# 隐藏层维度
parser.add_argument('--num_hidden_layers', default=8, type=int)
# 隐藏层层数
parser.add_argument('--max_seq_len', default=512, type=int)
# 最大文本长度
parser.add_argument('--use_moe', default=False, type=bool)
# 是否使用moe架构
parser.add_argument("--data_path", type=str, default="../dataset/pretrain_hq.jsonl")
# 输入数据路径
args = parser.parse_args()
lm_config = MiniMindConfig(hidden_size=args.hidden_size, num_hidden_layers=args.num_hidden_layers, use_moe=args.use_moe)
# 记录config
args.save_dir = os.path.join(args.out_dir)
os.makedirs(args.save_dir, exist_ok=True)
os.makedirs(args.out_dir, exist_ok=True)
# 创建制定文件夹
tokens_per_iter = args.batch_size * args.max_seq_len
#每轮总tokens数量
device_type = "cuda" if "cuda" in args.device else "cpu"
args.wandb_run_name = f"MiniMind-Pretrain-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()
ddp = int(os.environ.get("RANK", -1)) != -1 # is this a ddp run?
ddp_local_rank, DEVICE = 0, "cuda:0"
base_seed = 1337
torch.manual_seed(base_seed)
torch.cuda.manual_seed(base_seed)
if ddp:
init_distributed_mode()
args.device = torch.device(DEVICE)
rank = dist.get_rank()
torch.manual_seed(base_seed + rank)
# 同时设置 CUDA 的随机种子
torch.cuda.manual_seed(base_seed + rank)
if args.use_wandb and (not ddp or ddp_local_rank == 0):
import wandb
wandb.init(project=args.wandb_project, name=args.wandb_run_name)
else:
wandb = None
model, tokenizer = init_model(lm_config)
train_ds = PretrainDataset(args.data_path, tokenizer, max_length=args.max_seq_len)
train_sampler = DistributedSampler(train_ds) if ddp else None
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
pin_memory=True,
drop_last=False,
shuffle=False,
num_workers=args.num_workers,
sampler=train_sampler
)
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)
if ddp:
model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
# 旋转位置编码参数不用共享在ddp中
model = DistributedDataParallel(model, device_ids=[ddp_local_rank])
iter_per_epoch = len(train_loader)
for epoch in range(args.epochs):
train_epoch(epoch, wandb)
[photos]
[/photos]