Administrator
发布于 2024-05-06 / 31 阅读
0
0

学习记录Let's build GPT: from scratch, in code, spelled out.

尝试实现GPT

这是根据大佬Andrej Karpathy

https://www.youtube.com/watch?v=kCc8FmEb1nY

代码仓库:

https://github.com/karpathy/ng-video-lecture

做的实现和笔记。该视频分为几个部分:一是讲述GPT的基本原理;二是根据GPT基本原理,从头实现一个简单的GPT——可以学习莎士比亚的写作风格,对已有的戏剧文本进行续写

最简单的神经网络

在最开始的部分,Andrej首先带领我们构造一个最基本的网络,使得大家对文本生成模型有一个基本的认识和理解。(有助于神经网络小白大致了解神经网络的工作过程)。可以看到这个流程分为几个部分:

  1. 定义了最简单的encoder,用于把文本转换为数字类型,进而转换为张量,便于神经网络进行学习。decoder则是用于把神经网络的输出转换为数字

  2. 定义简单的BigramLanguageModel,它的两个核心函数:

  • forward函数(定义了数据在神经网络中各个层的传播方式)可以看到这里的实现很简单:通过词嵌入层将输入的词汇索引转换为对应的嵌入向量。这些向量作为模型的输出,并将这个向量作为下一个词的概率分布的直接预测。(具体咋预测,看下面的生成函数)

  • generate函数,传入的idx参数(是个 (B,T) array of indices in the current context),代表一系列词汇的起始序列;max_new_tokens表示要生成的最大新词的长度。可以看到generate是如何进行“预测”的:首先通过self(idx)自动调用forward方法,得到一个logit;然后,focus only on the last time step,只选择对于当前序列中最后一个词的预测输出logits = logits[:, -1, :] ;之后使用softmax得到概率 probs = F.softmax(logits, dim=1) ,输出的形状是(B, T+1) ,每行就是一个词对应的概率分布;最后,idx_next = torch.multinomial(probs, num_samples=1) ,从每个概率分布中采样一个样本(就是一个词的索引),然后把新的索引加到idx序列中。

# 一个简单的,没有经过严谨训练的模型
import torch
import torch.nn as nn
from torch.nn import functional as F
#####################PreDefine###############################
device = 'cuda' if torch.cuda.is_available() else 'cpu'

with open('input.txt', 'r',encoding='utf-8') as f:
    text = f.read()
chars = sorted(list(set(text)))
vocab_size = len(chars)    
# 创建字符到整数的映射  有许多第三方库可以完成这个任务,现在我们只是使用最简单的方法
stoi = {ch:i for i,ch in enumerate(chars)}  
# enumerate函数返回一个元组,第一个元素是索引,第二个元素是值。
itos = {i:ch for i,ch in enumerate(chars)} 

encoder = lambda s: [stoi[c] for c in s]
decoder = lambda l: ''.join([itos[i] for i in l])

data = torch.tensor(encoder(text), dtype = torch.long) # 将这个列表转换成一个 PyTorch 张量,用于训练模型
batch_size = 4 #  
block_size = 8 # 训练“预测”能力时候的最大长度

n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]


def get_batch(split):
    #生成输入x和目标y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size, )) # 随即抓取一块
    x = torch.stack([data[i: i+block_size] for i in ix])
    y = torch.stack([data[i+1: i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x,y


xb, yb = get_batch('train')

###################PreDefine###################

class BigramLanguageModel(nn.Module):
    def __init__(self, vocab_size) -> None:
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)  #一个随机的浮点权重矩阵
    def forward(self, idx, targets=None):  
        logits = self.token_embedding_table(idx)  # [B, T, C]  旧版
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)  # 维度重塑,便于满足交叉熵的计算要求
            targets = targets.view(B*T)
            # 评估预测的质量 Compute the cross entropy loss between input logits and target.
            loss = F.cross_entropy(logits, targets)
        
        return logits, loss
    
    def generate(self, idx, max_new_tokens):  
        for _ in range(max_new_tokens):  # 循环生成新词
            # get predictions
            logits, loss = self(idx)  # 内部调用forward方法
            logits = logits[:, -1, :]  # (B,C)
            # 使用softmax得到概率
            probs = F.softmax(logits, dim=1)  # (B, T+1) 每行就是一个词对应的概率分布
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) 
            # append the sampled index to the sequence
            idx = torch.cat((idx, idx_next), dim=1)  #(B, T+1)    
        print("idx22222----------------", idx)
        return idx
    
m = BigramLanguageModel(vocab_size)
m = m.to(device)

logits, loss = m(xb, yb)
print(logits.shape)
print("loss-----------------", loss)
# 以上的步骤构建出来的模型是随机的,所以输出是垃圾;我们要对模型进行训练
#create a pytorch optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr = 1e-3) 
# 一般学习率是3e-4,但是我们的网络小,可以设置高一些
batch_size = 32

for steps in range(4):
    # sample a batch of data
    xb, yb = get_batch('train')
    
    logits, loss = m(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
    
print(loss.item())

context = torch.zeros((1, 1), dtype=torch.long, device=device)

# print(m.generate(idx = torch.zeros((1,1), dtype = torch.long), max_new_tokens = 10))

print(decoder(m.generate(idx=torch.zeros((1,1), dtype=torch.long), max_new_tokens=1000)[0].tolist()))

使用Transformer

加入注意力机制

所谓注意力,就是关注过去的序列,不关注未来的.

首先,我们使用一个权重矩阵的例子,并看看它从版本0到升级2的实现;希望从而说明self-attention中的重要思想。

原版0:
torch.manual_seed(1337)

B,T,C = 4, 8 ,32
x = torch.randn(B, T, C) 
# below: single head perform self-attention
head_size = 16
tril = torch.tril(torch.ones(T, T))
wei = torch.zeros((T, T))
wei = wei.masked_fill(tril == 0, float('-inf'))
wei = F.softmax(wei, dim = -1)

out = wei @ x
升级1:
torch.manual_seed(1337)
B,T,C = 4, 8 ,32
x = torch.randn(B, T, C) 
# below: single head perform self-attention
head_size = 16
key = nn.Linear(C, head_size, bias = False) # 
query = nn.Linear(C, head_size, bias = False) 
k = key(x)
q = query(x)
wei = q @ k.transpose(-2, -1) 
print(wei)
tril = torch.tril(torch.ones(T, T))
wei = wei.masked_fill(tril == 0, float('-inf'))
wei = F.softmax(wei, dim = -1)

out = wei @ x
升级2:
torch.manual_seed(1337)
B,T,C = 4, 8 ,32
x = torch.randn(B, T, C)  
# below: single head perform self-attention
head_size = 16
key = nn.Linear(C, head_size, bias = False) # 
query = nn.Linear(C, head_size, bias = False) 
value = nn.Linear(C, head_size, bias = False)
k = key(x)
q = query(x)
wei = q @ k.transpose(-2, -1) 
print(wei)
tril = torch.tril(torch.ones(T, T))
wei = wei.masked_fill(tril == 0, float('-inf'))
wei = F.softmax(wei, dim = -1)

v = value(x)
out = wei @ v
import torch

x = torch.randn(B, T, C)  
# 通过标准正态分布随机生成

key = nn.Linear(C, head_size, bias = False) 
# 对输入数据进行线性变换,特征维度从 C 映射到 head_size

wei = q @ k.transpose(-2, -1) 
# 结合点积的几何意义: 每个点积分数表示对应键(即输入位置)与当前查询的匹配程度。

原版0:

这时候如果我们把wei输出来,它会是一个0,1矩阵(上三角是0,下三角是1,是uniform的);

这样的wei是不好的,显然它无法体现谁更加重要,无法帮助token捕捉互相之间的关系,从而帮助一个token找到它可能感兴趣的另一个token(比如,在我们文本预测的场景下,一个辅音字母后面大概率跟着元音字母,所以它的token会对元音字母的token更加感兴趣)

升级1:

于是我们看到,对于token x,我们还会计算它的query和key。一般来说,query表示token x想要什么;key表示x含有什么。那么想象一下,这时候如果我们有一个token A和token B,那么把query A和key B做点积,就可以知道B是否是A感兴趣的。(记得点积的几何意义吗?他表示两个向量各个维度的相似程度!如果query A和key B相似程度很高,那么我们可以认为B对A是感兴趣的)

之后我们的wei输出来就是这样的:

升级2

像生成query和key的方法一样,为x生成了value,这个value可以视作x的被进一步提取出来的信息,用于告诉其它token:嘿,如果我的key表明我是你在找的人,那么这些,在value里面,是我要告诉你的!

总结

  1. 正如我们刚刚看到的,所谓注意力机制,其实是“沟通机制”,它使得token之间不是孤立的,而是会发生交流,找到更加适合自己的“那些人”。

  2. “注意力”是没有空间的概念的(很多时候,我们用到向量,和向量联系的概念之一就是空间对吧,但是在这里,没有空间的概念,这也是为什么,我们还会为Transformer加入空间编码。)(这和其它一些框架,比如CNN,是不一样的)

  3. 在本篇教程的学习案例里面,我们使用下三角矩阵屏蔽了未来的特征,但其实这不是必要的。我们是在实现一个decoder模块;如果是要实现一个encoder模块,那么我们就不想要屏蔽未来的特征值了。

  4. 自注意力和交叉注意力。我们这里是自注意力机制。相信你也发现了,在“升级”代码中,query和key是来源于同一个x,而不是分别来源于x和y。交叉注意机制就是不同的token之间进行交流。(自交流也没有什么问题,自己也可以从自己身上学到东西嘛,一个token的不同维度不同数值也可以互相交流)

实现单头注意力机制的代码如下;多头注意力的实现也很简单:

class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias = False) # 对输入数据进行线性变换,特征维度从 C 映射到 head_size
        self.query = nn.Linear(n_embed, head_size, bias = False) 
        self.value = nn.Linear(n_embed, head_size, bias = False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)
        q = self.key(x)
        # compute attention scores(亲和性,相似度)
        wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5
        wei = wei.masked_fill(self.tril[:T, :T]==0, float('-inf'))
        wei = F.softmax(wei, dim = -1)
        wei = self.dropout(wei)
        # perform the weighted aggregation
        v = self.value(x)
        out = wei @ v
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super.__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        out = torc.cat([h(x) for h in self.heads], dim = -1)
        out = self.dropout(self.proj(out))
        return out

FeedForward_多层感知机

想象一下,通过刚刚引入的注意力机制,我们的某个token拿到了他感兴趣的“那个人”的信息,那么接下来,他是不是要处理这些信息呢?

也就是先communication,再computation;

我们构造一个简单的前向传播层来做这个事情。

class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLu(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout)
        )
        
    def forward(self, x):
        self.net(x)

使用Block来封装先communication,再computation的操作,这个Block类,我们后面说完LayerNorm再解释。

一些问题

我们引入位置编码,注意力机制,前向传播之后进行训练,会发现效果依然不好,原因是,我们的神经网络加深了,随之而来了一些问题,比如使得神经网络不能持续优化。

有两种解决办法:

残差连接 Residual

残差连接,这个在我们已经给出的代码里已经体现了一部分。如图所示,Residual Connection(残差连接)是一种在深度学习模型中常用的技术,特别是在深度神经网络中。它通过将网络中某一层的输入直接添加到该层的输出上,来帮助缓解梯度消失的问题,并且可以提高模型的训练效率和准确性。

  • MultiHeadAttention 类的 forward 方法中,每个头(h)处理输入 x 后,所有头的输出被拼接起来,然后通过一个全连接层和dropout层。

Layer Norm

进行完成上面的介绍后,我们把几个组件封装到一起,形成Transformer中基本的Block功能模块。

class Block(nn.Modle):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd
        
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
	def forward(self, x):
			#自注意力机制处理 归一化,残差连接
	        x = x + self.sa(self.ln1(x))  
			#前馈网络处理 归一化,残差连接
	        x = x + self.ffwd(self.ln2(x))	
	        return x

GPT诞生

在这个教程对应的任务里,只需要decoder,而一些任务,则需要动用完整的TF框架,比如一个把法语翻译为英语的任务。

所以我们的TF是一个没有encoder的框架,也没有encoder和decoder之间的一些交互作用。和GPT一样,我们的model是一个decoder-only-transformer。(其实关于GPT为啥是decoder-only的,有更深层次的原因,争取之后学习并整理成博客)

综合上面我们实现的一些组件,最后的GPT长这样:

注:

self.apply(self._init_weights) 用于遍历 GPTLanguageModel 实例中的所有子模块,并调用 _init_weights 函数对这些子模块的权重进行初始化

class GPTLanguageModel(nn.Modle):
    def __init__(self):
        super().__init__()
        
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
        
        self.position_embedding_table = nn.Embedding(vocab_size, vocab_size)
        
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.linear(n_embd, vocab_size)
        
        self.apply(self._init_weight)
        
    def _init_weight(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean =0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Enbedding):
            torch.nn.init.normal_(module.weight, mean =0.0, std=0.02)
            
    def forward(self, idx, targets=None):
        # 取出token编码和对应的位置编码
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)  # 会自动调用forward
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


评论