7 循环神经网络

1 序列模型

1.1 统计工具

1.1.1 自回归模型 (Autoregressive)

假设对于时间序列 {xt}, 我们预测 xtP(xt|xt1,,x1).

Pasted image 20250502154449.png|300

我们假设数据是平稳的 (stationary), 也即动力学不会改变, 这样可以估计整个序列 P(x1,,xT)=t=1TP(xt|xt1,,x1).

1.1.2 Markov 模型

一阶 Markov 模型: P(x1,,xT)=t=1TP(xt|xt1),P(x1|x0)=P(x1).
xt 仅是离散值时, 可以使用类似动态规划的思想. 例如: P(xt+1|xt1)=xtP(xt+1,xt,xt1)P(xt1)=xtP(xt+1|xt,xt1)P(xt,xt1)P(xt1)=xtP(xt+1|xt)P(xt|xt1).
因此, 我们只需要考虑 P(xt+1|xt,xt1)=P(xt+1|xt).

1.1.3 因果关系

基于条件概率公式可以反向书写 P(x1,,xT)=t=T1P(xt|xt+1,,xT).
但根据事件的因果关系, P(xt+1|xt)P(xt|xt+1) 容易解释.

2 文本预处理

2.1 读取数据集

这边使用 H.G.Well 的时光机器作为数据集, 包含 30000 多个单词

def read_time_machine():
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()

2.2 Tokenization

def tokenize(lines, token='word')
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('错误: 未知词元类型: ' + token)

tokens = tokenize(lines)

2.3 词表 (Vocabulary)

将输入的字符串变为模型可以处理的数字, 将每个单词映射到一个索引.

class Vocab:
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
    #只保留频率达到min_freq的词汇
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        counter = count_corpus(tokens)
        self._token_freqs = sorted(counter.items(), key=lambda x:x[1], reverse=True)
        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
        for token, freq in self._token_freqs:
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1
    
    def __len__(self):
        return len(self.idx_to_token)
    
    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]
    
    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]
    
    @property
    def unk(self):
        return 0
    
    @property
    def token_freqs(self):
        return self._token_freqs
                

def count_corpus(tokens):
    #展平
    if len(tokens) == 0 or isinstance(tokens[0], list):
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

这里出现的一些 Python 用法:

用法示例:

for i in [0, 10]:
    print('文本:', tokens[i])
    print('索引:', vocab[tokens[i]])

3 语言模型和数据集

假设词元 x1,,xT, 语言模型希望估计联合概率 P(x1,,xT). 希望抽取一些词后, 生成有意义的对话.

3.1 学习语言模型

根据条件概率公式: P(x1,,xT)=t=1TP(xt|x1,,xt1). 例如: P(deep,learning,is,fun)=P(deep)P(learning|deep)P(is|) 根据词频, 可以估计 P^(learning|deep)=n(deep,learning)n(deep). 但是多个单词的组合可能很难遇到. 为此可以用 Laplace 平滑: m 表示唯一单词的数量, n 表示训练集的单词总数: P^(x)=n(x)+ε1/mn+ε1,P^(x|x)=n(x,x)+ε2P^(x)n(x)+ε2,P^(x|x,x)=n(x,x,x)+ε3P^(x)n(x,x)+ε3. 这样在所有计数中添加了一个小常量.

不过, 这样的模型忽略了单词的意思, 只考虑频率, 效果不佳.

3.2 Markov 模型与 n 元语法

P(x1,x2,x3,x4)=P(x1)P(x2)P(x3)P(x4),P(x1,x2,x3,x4)=P(x1)P(x2x1)P(x3x2)P(x4x3),P(x1,x2,x3,x4)=P(x1)P(x2x1)P(x3x1,x2)P(x4x2,x3).

将涉及一、两、三个变量的概率公式称为一/二/三元语法.

3.3 自然语言统计

最高频的词, 如 the, a, i, 都没有太多实际意义. 称为停用词(stopwords).
从词频衰减图上看出, 可以近似认为第 i 个常用单词的频率为 ni1iα, 也即 logni=αlogi+c. 二元语法中常见的组合有类似 (of, the), (in the) 等.

Pasted image 20250502164906.png|350
从图中看出

3.4 读取长序列数据

假设网络一次处理有 n 个时间步的子序列, 理论上来说它们都会把单词打乱, 因此效果差不多. 但是这个偏移量有更好的处理方式吗?

3.4.1 随机采样

def seq_data_iter_random(corpus, batch_size, num_steps):
    """使用随机抽样生成一个小批量子序列"""
    #从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
    corpus = corpus[random.randint(0, num_steps - 1):]
    #减去1,是因为我们需要考虑标签
    num_subseqs = (len(corpus) - 1) // num_steps
    #长度为num_steps的子序列的起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    #在随机抽样的迭代过程中,
    #来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
    random.shuffle(initial_indices)

    def data(pos):
        #返回从pos位置开始的长度为num_steps的序列
        return corpus[pos: pos + num_steps]

    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        #在这里,initial_indices包含子序列的随机起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)

3.4.2 顺序分区

def seq_data_iter_sequential(corpus, batch_size, num_steps):
    """使用顺序分区生成一个小批量子序列"""
    #从随机偏移量开始划分序列
    offset = random.randint(0, num_steps)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    num_batches = Xs.shape[1] // num_steps
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y

4 循环神经网络 RNN

考虑之前提到的隐变量模型.
回顾 3 多层感知机: H=ϕ(XWxh+bh),O=HWhq+bq,softmax(O).
现在有小批量输入 XtRn×d (n 个序列样本), 用 HtRn×h 表示 t 时候的隐藏变量. 因此 Ht=ϕ(XtWxh+Ht1Whh+bh) (同步更新 Ht)

Pasted image 20250502172801.png

上述 XtWxh+Ht1Whh 可以认为是 Xt,Ht1 拼接后与 Wxh,Whh 拼接后进行矩阵乘法.

4.0.1 基于 RNN 的字符级语言模型

预测逐个字符的下一个字符. 例如, 数据集为 (m, a), (a, c), (c, h), (h, i), (i, n), (n, e). 实践中, 加上批量大小 n, 而每个词元用 d 维向量表示, 因此 XtRn×d.

4.1 困惑度 Perplexity

困惑度用来衡量模型的质量. 定义 exp(1nt=1nlogP(xt|xt1,,x1)). (下一个词元世纪选择数的调和平均数, 是交叉熵的指数).

5 RNN 的从零实现

5.1 One-hot Encoding

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

每次的小批量数据形状是 (batch_size, num_steps), 通过 one-hot 编码, 变成三维张量(最后一个维度是词表大小).

5.2 初始化模型参数

def get_params(vocab_size, num_hiddens, device):
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    #隐藏层参数
    W_xh = normal((num_inputs, num_hiddens))
    W_hh = normal((num_hiddens, num_hiddens))
    b_h = torch.zeros(num_hiddens, device=device)
    #输出层参数
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    #附加梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    return params

5.3 RNN 模型

def init_rnn_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device), )

选取 tanh 作为激活函数(元素在实数上满足均匀分布时, 平均值为 0)

def rnn(inputs, state, params):
    #inputs的形状:(时间步数量,批量大小,词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    #X的形状:(批量大小,词表大小)
    for X in inputs:
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H,)
class RNNModelScratch:
    """从零开始实现的循环神经网络模型"""
    def __init__(self, vocab_size, num_hiddens, device,
                 get_params, init_state, forward_fn):
        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
        self.params = get_params(vocab_size, num_hiddens, device)
        self.init_state, self.forward_fn = init_state, forward_fn

    def __call__(self, X, state):
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        return self.forward_fn(X, state, self.params)

    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)

5.4 预测

def predict_ch8(prefix, num_preds, net, vocab, device):  #@save
    """在prefix后面生成新字符"""
    state = net.begin_state(batch_size=1, device=device)
    outputs = [vocab[prefix[0]]]
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    for y in prefix[1:]:  # 预热期
        _, state = net(get_input(), state)
        outputs.append(vocab[y])
    for _ in range(num_preds):  # 预测num_preds步
        y, state = net(get_input(), state)
        outputs.append(int(y.argmax(dim=1).reshape(1)))
    return ''.join([vocab.idx_to_token[i] for i in outputs])

5.5 梯度裁剪

T 时间步, 反向传播会产生 O(T) 长度的矩阵乘法链, 导致梯度爆炸或者梯度消失.
一般来说, 如果我们假设 f 是 Lipschitz 连续的: |f(x)f(y)|L||xy||, 也即更新中 |f(x)f(xηg)|Lη||g||. 如果需要应对偶然出现的大梯度, 可以考虑裁剪梯度 g: gmin(1,θ||g||)g (将梯度 g 投影回半径 θ 的球).

def grad_clipping(net, theta):  #@save
    """裁剪梯度"""
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

5.6 训练

注意

#@save
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    """训练网络一个迭代周期"""
    state, timer = None, d2l.Timer()
    metric = d2l.Accumulator(2)  # 训练损失之和,词元数量
    for X, Y in train_iter:
        if state is None or use_random_iter:
            #在第一次迭代或使用随机抽样时初始化state
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            if isinstance(net, nn.Module) and not isinstance(state, tuple):
                #state对于nn.GRU是个张量
                state.detach_()
            else:
                #state对于nn.LSTM或对于我们从零开始实现的模型是个张量
                for s in state:
                    s.detach_()
        y = Y.T.reshape(-1)
        X, y = X.to(device), y.to(device)
        y_hat, state = net(X, state)
        l = loss(y_hat, y.long()).mean()
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1)
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            #因为已经调用了mean函数
            updater(batch_size=1)
        metric.add(l * y.numel(), y.numel())
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()

6 简洁实现

7 通过时间反向传播

7.1 梯度分析

回顾定义: ht=f(xt,ht1,wh),ot=g(ht,wo). 从而我们有一个链: {,(xt1,ht1,ot1),(xt,ht,ot),}. 前向传播即依次更新, 然后计算损失函数 L(x1,,xT,y1,,yT,wh,wo)=1Tt=1Tl(yt,ot).
对于反向传播, Lwh=1Tt=1Tl(yt,ot)wh=1Tt=1Tl(yt,ot)otg(ht,wo)hthtwh, 而这里 ht/wh 的计算比较麻烦: htwh=f(xt,ht1,wh)wh+f(xt,ht1,wh)ht1ht1wh. 假设我们有三个序列 {at},{bt},{ct}, a0=0,at=bt+ctat1, 这样 at=bt+i=1t1(j=i+1t1cj)bi, 然后再替换 at=htwh,bt=f(xt,ht1,wh)wh,ct=f(xt,ht1,wh)ht1, 这样 htwh=f(xt,ht1,wh)wh+i=1t1(j=i+1tf(xj,hj1,wh)hj1)f(xi,hi1,wh)wh.
完全计算很繁琐, 可以考虑常规截断或者随机截断, 我们采用常规截断.