【从零构建大模型】一、文本数据处理

概览

构建大模型的全景图如下,本文介绍了最开始的数据处理。

数据处理的全景图如下所示,大致流程为:

  1. 将原文本,一般为一个string,进行分割。

  2. 对分割后的词转化为id。

  3. 生成一个embeddings层,然后id作为序号去embeddings层中取对应的行作为自己的表征。

如此转化后就将原本深度学习模型不能处理的原始数据转化为了可以处理的矩阵,同时我们希望这最后表示原始数据的矩阵中也能表示词之间的关系,例如词意相似的单词在可以空间上较为接近。

介绍

Tokenizing text

拆分的规则可以有多种,最简单的就是直接按单词粒度进行拆分,例如直接以空格作为分隔符,但是需要注意拆分过程中需要注意标点符合与单词之间没有空格。

简单的代码实现:

1
2
3
4
5
text = "Hello, world. Is this-- a test?"

result = re.split(r'([,.:;?_!"()\']|--|\s)', text)
result = [item.strip() for item in result if item.strip()]
print(result)

Converting tokens into token IDs

在进行拆分后需要对所有的单词、符合进行去重,建立一张可以单词与id互相映射的词汇表,最简单的方法就是直接按照字母顺序对其进行排序。然后这张词汇表在代码中的显示就是一个支持encode和decode的类,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
all_words = sorted(set(preprocessed))
vocab = {token:integer for integer,token in enumerate(all_words)}

class SimpleTokenizerV1:
def __init__(self, vocab):
self.str_to_int = vocab
self.int_to_str = {i:s for s,i in vocab.items()}

def encode(self, text):
preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text)

preprocessed = [
item.strip() for item in preprocessed if item.strip()
]
ids = [self.str_to_int[s] for s in preprocessed]
return ids

def decode(self, ids):
text = " ".join([self.int_to_str[i] for i in ids])
# Replace spaces before the specified punctuations
text = re.sub(r'\s+([,.?!"()\'])', r'\1', text)
return text

Adding special context tokens

常见的特殊标识符有:

  • [BOS] 一个文本序列的起始标识

  • [EOS] 一个文本序列的结束标识

  • [PAD] 如果batch size大于1,那么就需要用它来填充那些较短的文本,以做到长度统一

  • [UNK] 用于表示在词汇表之外的单词

GPT、GPT-2 只使用了<|endoftext|> ,当做结束的标识符号,也当做填充的标识符。GPT-2不需要[UNK] ,因为它使用 byte-pair encoding (BPE)来编码

加入[UNK] 后的词汇表如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SimpleTokenizerV2:
def __init__(self, vocab):
self.str_to_int = vocab
self.int_to_str = { i:s for s,i in vocab.items()}

def encode(self, text):
preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text)
preprocessed = [item.strip() for item in preprocessed if item.strip()]
preprocessed = [
item if item in self.str_to_int
else "<|unk|>" for item in preprocessed
]

ids = [self.str_to_int[s] for s in preprocessed]
return ids

def decode(self, ids):
text = " ".join([self.int_to_str[i] for i in ids])
# Replace spaces before the specified punctuations
text = re.sub(r'\s+([,.:;?!"()\'])', r'\1', text)
return text

BytePair encoding(BPE)

BPE分词器可以将句子分解并转化为id,对于未知词,可以将其分解为子词和单个字符,这样它就可以解析任何单词。

简单使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import importlib
import tiktoken

print("tiktoken version:", importlib.metadata.version("tiktoken"))
tokenizer = tiktoken.get_encoding("gpt2")

text = (
"Hello, do you like tea? <|endoftext|> In the sunlit terraces"
"of someunknownPlace."
)

integers = tokenizer.encode(text, allowed_special={"<|endoftext|>"})

print(integers)

Data sampling with a sliding window

在训练过程中往往会将文本划分为多个块,一个模型进行训练的预测时只会基于这个块中前面的部分词来预测后一个词,简单的表示如下:

因为是基于已有的输入预测后一个,所以对于正确的output实际上就是input往后移一位,如下:

上面这个是一个batch,为了产生多组batch可以使用滑动窗口的思想,通过控制步长来生成多个batch:

一个简单的生成数据集的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from torch.utils.data import Dataset, DataLoader


class GPTDatasetV1(Dataset):
def __init__(self, txt, tokenizer, max_length, stride):
self.input_ids = []
self.target_ids = []

# Tokenize the entire text
token_ids = tokenizer.encode(txt, allowed_special={"<|endoftext|>"})
assert len(token_ids) > max_length, "Number of tokenized inputs must at least be equal to max_length+1"

# Use a sliding window to chunk the book into overlapping sequences of max_length
for i in range(0, len(token_ids) - max_length, stride):
input_chunk = token_ids[i:i + max_length]
target_chunk = token_ids[i + 1: i + max_length + 1]
self.input_ids.append(torch.tensor(input_chunk))
self.target_ids.append(torch.tensor(target_chunk))

def __len__(self):
return len(self.input_ids)

def __getitem__(self, idx):
return self.input_ids[idx], self.target_ids[idx]

def create_dataloader_v1(txt, batch_size=4, max_length=256,
stride=128, shuffle=True, drop_last=True,
num_workers=0):

# Initialize the tokenizer
tokenizer = tiktoken.get_encoding("gpt2")

# Create dataset
dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)

# Create dataloader
dataloader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers
)

return dataloader

with open("the-verdict.txt", "r", encoding="utf-8") as f:
raw_text = f.read()
dataloader = create_dataloader_v1(
raw_text, batch_size=1, max_length=4, stride=1, shuffle=False
)

data_iter = iter(dataloader)
first_batch = next(data_iter)
print(first_batch)
# [tensor([[ 40, 367, 2885, 1464]]), tensor([[ 367, 2885, 1464, 1807]])]

Creating token embeddings

得到标记 ID 后需要将其转换为连续的向量表示,即所谓的token embeddings。

其首先需要初始化一个embeddings层,其行数为id的数量,列数为用来表示一个token的信息量,也就是输出的维度。

然后根据当前token的id就直接去对应的行取出那一行作为其token的表示,如下:

简单的代码实现:

1
2
3
4
5
6
7
8
9
vocab_size = 6
output_dim = 3

torch.manual_seed(123)
embedding_layer = torch.nn.Embedding(vocab_size, output_dim)

print(embedding_layer.weight)
print(embedding_layer(torch.tensor([3])))
# tensor([[-0.4015, 0.9666, -1.1481]], grad_fn=<EmbeddingBackward0>)

Encoding word positions

一个词在不同的位置其表示的含义可能会有所不同,所以还需要有位置编码的信息。position-aware embeddings有两类:

  • 相对位置嵌入的重点并非关注标记的绝对位置,而是标记之间的相对位置或距离。这意味着模型学习的是“相距多远”而不是“具体在哪个位置”的关系。这样做的好处是,即使在训练过程中没有遇到过这种长度的序列,模型也能更好地泛化到不同长度的序列。

  • OpenAI 的 GPT 模型使用绝对位置嵌入,这些嵌入在训练过程中进行优化,而不是像原始 Transformer 模型中的位置编码那样固定或预定义。此优化过程是模型训练本身的一部分。

简单的位置嵌入编码就是直接生成一个行数为最大位置数的embeddings层,然后各个词按照序列id去取对应的embddings来得到位置嵌入编码,之后需要将Token embeddings和Positional embeddings进行相加最后得到一个完整的Iput embeddings。

简单的实现如下:

1
2
3
4
5
6
7
context_length = max_length
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)

pos_embeddings = pos_embedding_layer(torch.arange(max_length))

input_embeddings = token_embeddings + pos_embeddings
print(input_embeddings.shape)

参考资料


【从零构建大模型】一、文本数据处理
http://example.com/2025/05/01/LLMFromScratch1/
作者
滑滑蛋
发布于
2025年5月1日
许可协议