机灵的小熊猫 · invalid number of ...· 9 月前 · |
气宇轩昂的铁链 · PHP 很有用的一个函数 ...· 1 年前 · |
果断的小虾米 · [Abp vNext 源码分析] - ...· 1 年前 · |
译者: 飞龙
本文来自 【ApacheCN 深度学习 译文集】 ,采用 译后编辑(MTPE) 流程来尽可能提升效率。
不要担心自己的形象,只关心如何实现目标。——《原则》,生活原则 2.3.c
生成网络得到了加州理工学院理工学院本科物理学教授理查德·费曼(Richard Feynman)和诺贝尔奖获得者的名言的支持:“我无法创造,就无法理解”。 生成网络是拥有可以理解世界并在其中存储知识的系统的最有前途的方法之一。 顾名思义,生成网络学习真实数据分布的模式,并尝试生成看起来像来自此真实数据分布的样本的新样本。
生成模型是无监督学习的子类别,因为它们通过尝试生成样本来学习基本模式。 他们通过推送低维潜向量和参数向量来了解生成图像所需的重要特征,从而实现了这一目的。 网络在生成图像时获得的知识本质上是关于系统和环境的知识。 从某种意义上说,我们通过要求网络做某事来欺骗网络,但是网络必须在不了解自己正在学习的情况下学习我们的需求。
生成网络已经在不同的深度学习领域,特别是在计算机视觉领域显示出了可喜的成果。 去模糊或提高图像的分辨率,图像修补以填充缺失的片段,对音频片段进行降噪,从文本生成语音,自动回复消息以及从文本生成图像/视频是一些研究的活跃领域。
在本章中,我们将讨论一些主要的生成网络架构。 更准确地说,我们将看到一个自回归模型和一个 生成对抗网络 ( GAN )。 首先,我们将了解这两种架构的基本组成部分是什么,以及它们之间的区别。 除此说明外,我们还将介绍一些示例和 PyTorch 代码。
生成网络现今主要用于艺术应用中。 样式迁移,图像优化,去模糊,分辨率改善以及其他一些示例。 以下是计算机视觉中使用的生成模型的两个示例。
图 6.1:生成模型应用示例,例如超分辨率和图像修复
来源:《具有上下文注意的生成图像修复》,余佳辉等人;《使用生成对抗网络的照片级逼真的单图像超分辨率》,Christian Ledig 等人
GAN 的创建者 Ian Goodfellow 描述了几类生成网络:
图 6.2 生成网络的层次结构
我们将讨论这两个主要类别,它们在过去已经讨论过很多并且仍然是活跃的研究领域:
自回归模型是从先前的值推断当前值的模型,正如我们在第 5 章,“序列数据处理”中使用 RNN 所讨论的那样。 变分自编码器 ( VAE )是自编码器的一种变体,由编码器和解码器组成,其中编码器将输入编码为低维潜在空间向量, 解码器解码潜向量以生成类似于输入的输出。
整个研究界都同意,GAN 是人工智能世界中的下一个重要事物之一。 GAN 具有生成网络和对抗网络,并且两者相互竞争以生成高质量的输出图像。 GAN 和自回归模型都基于不同的原理工作,但是每种方法都有其自身的优缺点。 在本章中,我们将使用这两种方法开发一个基本示例。
自回归模型使用先前步骤中的信息并创建下一个输出。 RNN 为语言建模任务生成文本是自回归模型的典型示例。
图 6.3:用于 RNN 语言建模的自回归模型
自回归模型独立生成第一个输入,或者我们将其提供给网络。 例如,对于 RNN,我们将第一个单词提供给网络,而网络使用我们提供的第一个单词来假设第二个单词是什么。 然后,它使用第一个和第二个单词来预测第三个单词,依此类推。
尽管大多数生成任务都是在图像上完成的,但我们的自回归生成是在音频上。 我们将构建 WaveNet,它是 Google DeepMind 的研究成果,它是当前音频生成的最新实现,尤其是用于文本到语音处理。 通过这一过程,我们将探索什么是用于音频处理的 PyTorch API。 但是在查看 WaveNet 之前,我们需要实现 WaveNet 的基础模块 PixelCNN,它基于自回归 卷积神经网络 ( CNN )构建。
自回归模型已经被使用和探索了很多,因为每种流行的方法都有其自身的缺点。 自回归模型的主要缺点是它们的速度,因为它们顺序生成输出。 由于正向传播也是顺序的,因此在 PixelRNN 中情况变得更糟。
图 6.4:从 PixelCNN 生成的图像
资料来源:《使用 PixelCNN 解码器的条件图像生成》,Aäronvan den Oord 和其他人
PixelCNN 由 DeepMind 引入,并且是 DeepMind 引入的三种自回归模型之一。 在首次引入 PixelCNN 之后,已经进行了多次迭代以提高速度和效率,但是我们将学习基本的 PixelCNN,这是构建 WaveNet 所需要的。
PixelCNN 一次生成一个像素,并使用该像素生成下一个像素,然后使用前两个像素生成下一个像素。 在 PixelCNN 中,有一个概率密度模型,该模型可以学习所有图像的密度分布并从该分布生成图像。 但是在这里,我们试图通过采用所有先前预测的联合概率来限制在所有先前生成的像素上生成的每个像素。
与 PixelRNN 不同,PixelCNN 使用卷积层作为接收场,从而缩短了输入的读取时间。 考虑一下图像被某些东西遮挡了; 假设我们只有一半的图像。 因此,我们有一半的图像,并且我们的算法需要生成后半部分。 在 PixelRNN 中,网络需要像图像中的单词序列一样逐个获取每个像素,并生成一半的图像,而 PixelCNN 则通过卷积层一次获取图像。 但是,无论如何,PixelCNN 的生成都必须是顺序的。 您可能想知道只有一半的图像会进行卷积。 答案是遮罩卷积,我们将在后面解释。
“图 6.5”显示了如何对像素集应用卷积运算以预测中心像素。 与其他模型相比,自回归模型的主要优点是联合概率学习技术易于处理,可以使用梯度下降进行学习。 没有近似值,也没有解决方法。 我们只是尝试在给定所有先前像素值的情况下预测每个像素值,并且训练完全由反向传播支持。 但是,由于生成始终是顺序的,因此我们很难使用自回归模型来实现可伸缩性。 PixelCNN 是一个结构良好的模型,在生成新像素的同时,将各个概率的乘积作为所有先前像素的联合概率。 在 RNN 模型中,这是默认行为,但是 CNN 模型通过使用巧妙设计的遮罩来实现此目的,如前所述。
PixelCNN 捕获参数中像素之间的依存关系分布,这与其他方法不同。 VAE 通过生成隐藏的潜在向量来学习此分布,该向量引入了独立的假设。 在 PixelCNN 中,学习的依赖性不仅在先前的像素之间,而且在不同的通道之间; 在正常的彩色图像中,它是红色,绿色和蓝色(RGB)。
图 6.5:从周围像素预测像素值
有一个基本问题:如果 CNN 尝试使用当前像素或将来的像素来学习当前像素怎么办? 这也由掩码管理,掩码将 自身 的粒度也提高到了通道级别。 例如,当前像素的红色通道不会从当前像素中学习,但会从先前的像素中学习。 但是绿色通道现在可以使用当前红色通道和所有先前的像素。 同样,蓝色通道可以从当前像素的绿色和红色通道以及所有先前的像素中学习。
整个网络中使用两种类型的掩码,但是后面的层不需要具有这种安全性,尽管它们在进行并行卷积操作时仍需要模拟顺序学习。 因此,PixelCNN 论文[1]引入了两种类型的蒙版:类型 A 和类型 B。
使 PixelCNN 与其他传统 CNN 模型脱颖而出的主要架构差异之一是缺少池化层。 由于 PixelCNN 的目的不是以缩小尺寸的形式捕获图像的本质,并且我们不能承担通过合并丢失上下文的风险,因此作者故意删除了合并层。
fm = 64
net = nn.Sequential(
MaskedConv2d('A', 1, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
MaskedConv2d('B', fm, fm, 7, 1, 3, bias=False),
nn.BatchNorm2d(fm), nn.ReLU(True),
nn.Conv2d(fm, 256, 1))
前面的代码段是完整的 PixelCNN 模型,该模型包装在顺序单元中。 它由一堆MaskedConv2d
实例组成,这些实例继承自torch.nn.Conv2d
,并使用了torch.nn
中Conv2d
的所有*args
和**kwargs
。 每个卷积单元之后是批量规范层和 ReLU 层,这是与卷积层成功组合的。 作者决定不在普通层上使用线性层,而是决定使用普通的二维卷积,事实证明,该方法比线性层更好。
PixelCNN 中使用了遮罩卷积,以防止在训练网络时信息从将来的像素和当前的像素流向生成任务。 这很重要,因为在生成像素时,我们无法访问将来的像素或当前像素。 但是,有一个例外,之前已描述过。 当前绿色通道值的生成可以使用红色通道的预测,而当前蓝色通道的生成可以使用绿色和红色通道的预测。
通过将所有不需要的像素清零来完成屏蔽。 将创建一个与张量相等的掩码张量,其值为 1 和 0,对于所有不必要的像素,其值为 0。 然后,在进行卷积运算之前,此掩码张量与权重张量相乘。
图 6.6:左侧是遮罩,右侧是 PixelCNN 中的上下文
由于 PixelCNN 不使用池化层和反卷积层,因此随着流的进行,通道大小应保持恒定。 遮罩 A 专门负责阻止网络从当前像素学习值,而遮罩 B 将通道大小保持为三(RGB),并通过允许当前像素值取决于本身的值来允许网络具有更大的灵活性。
图 6.7:遮罩 A 和遮罩 B
class MaskedConv2d(nn.Conv2d):
def __init__(self, mask_type, *args, **kwargs):
super(MaskedConv2d, self).__init__(*args, **kwargs)
assert mask_type in ('A', 'B')
self.register_buffer('mask', self.weight.data.clone())
_, _, kH, kW = self.weight.size()
self.mask.fill_(1)
self.mask[:, :, kH // 2, kW // 2 + (mask_type == 'B'):] = 0
self.mask[:, :, kH // 2 + 1:] = 0
def forward(self, x):
self.weight.data *= self.mask
return super(MaskedConv2d, self).forward(x)
先前的类MaskedConv2d
从torch.nn.Conv2d
继承,而不是从torch.nn.Module
继承。 即使我们从torch.nn.Module
继承来正常创建自定义模型类,但由于我们试图使Conv2d
增强带掩码的操作,我们还是从torch.nn.Conv2D
继承,而torch.nn.Conv2D
则从torch.nn.Conv2D
继承 torch.nn.Module
。 类方法register_buffer
是 PyTorch 提供的方便的 API 之一,可以将任何张量添加到state_dict
字典对象,如果尝试将模型保存到磁盘,则该对象随模型一起保存到磁盘。
添加有状态变量(然后可以在forward
函数中重用)的明显方法是将其添加为对象属性:
self.mask = self.weight.data.clone()
但这绝不会成为state_dict
的一部分,也永远不会保存到磁盘。 使用register_buffer
,我们可以确保我们创建的新张量将成为state_dict
的一部分。 然后使用原地fill_
操作将掩码张量填充为 1s,然后向其添加 0 以得到类似于“图 6.6”的张量,尽管该图仅显示了二维张量, 实际权重张量是三维的。 forward
函数仅用于通过乘以遮罩张量来遮罩权重张量。 乘法将保留与掩码具有 1 的索引对应的所有值,同时删除与掩码具有 0 的索引对应的所有值。然后,对父级Conv2d
层的常规调用使用权重张量,并执行二维卷积操作。
网络的最后一层是 softmax 层,该层可预测像素的 256 个可能值中的值,从而离散化网络的输出生成,而先前使用的最先进的自回归模型将在网络的最后一层上继续生成值。
optimizer = optim.Adam(net.parameters())
for epoch in range(25):
net.train(True)
for input, _ in tr:
target = (input[:,0] * 255).long()
out = net(input)
loss = F.cross_entropy(out, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
训练使用具有默认动量速率的Adam
优化器。 另外,损失函数是从 PyTorch 的Functional
模块创建的。 除了创建target
变量以外,其他所有操作均与常规训练操作相同。
到目前为止,我们一直在有监督的学习中工作,其中明确给出了标签,但是在这种情况下,目标与输入相同,因为我们试图重新创建相同的输出。 torchvision
包对像素应用了转换和归一化,并将像素值范围从 0 到 255 转换为 -1 到 1。我们需要转换回 0 到 255 的范围,因为我们在最后一层使用了 softmax,并且会在 0 到 255 之间生成概率分布。
门控 PixelCNN
DeepMind 在 PixelCNN 的一篇迭代论文中成功地使用了门控 PixelCNN ,该方法通过用 Sigmoid 和 tanh 构建的门代替 ReLU 激活函数。 PixelCNN [1]的介绍性论文提供了三种用于解决同一代网络的不同方法,其中具有 RNN 的模型优于其他两种。 DeepMind 仍引入了基于 CNN 的模型来显示与 PixelRNN 相比的速度增益。 但是,随着 PixelCNN 中门控激活的引入,作者能够将表现与 RNN 变体相匹配,从而获得更大的表现增益。 同一篇论文介绍了一种避免盲点并在生成时增加全局和局部条件的机制,这超出了本书的范围,因为对于 WaveNet 模型而言这不是必需的。
WaveNet
DeepMind 在另一篇针对其自回归生成网络的迭代论文[2]中引入了 WaveNet,其中包括 PixelCNN。 实际上,WaveNet 架构是基于 PixelCNN 的构建的,与 PixelRNN 相比,WaveNet 架构使网络能够以相对更快的方式生成输出。 借助 WaveNet,我们在书中首次探索了针对音频信号的神经网络实现。 我们对音频信号使用一维卷积,这与 PixelCNN 的二维卷积不同,对于初学者而言,这是相当复杂的。
WaveNet 取代了对音频信号使用傅里叶变换的传统方法。 它通过使神经网络找出要执行的转换来做到这一点。 因此,转换可以反向传播,原始音频数据可以使用一些技术来处理,例如膨胀卷积,8 位量化等。 但是人们一直在研究将 WaveNet 方法与传统方法相结合,尽管该方法将损失函数转换为多元回归,而不是 WaveNet 使用的分类。
PyTorch 向后公开了此类传统方法的 API。 以下是对傅立叶变换的结果进行快速傅立叶变换和傅立叶逆变换以获取实际输入的示例。 两种操作都在二维张量上,最后一个维为 2,表示复数的实部和虚部。
PyTorch 提供了用于快速傅里叶变换(torch.fft
),快速傅里叶逆变换(torch.ifft
),实数到复杂傅里叶变换(torch.rfft
),实数到复杂傅里叶变换(torch.irfft
)的 API。 ),短时傅立叶变换(torch.stft
)和几个窗口函数,例如 Hann 窗口,Hamming 窗口和 Bartlett 窗口。
>>> x = torch.ones(3,2)
[torch.FloatTensor of size (3,2)]
>>> torch.fft(x, 1)
[torch.FloatTensor of size (3,2)]
>>> fft_x = torch.fft(x, 1)
>>> torch.ifft(fft_x, 1)
[torch.FloatTensor of size (3,2)]
WaveNet 并不是第一个引入序列数据卷积网络或扩张的卷积网络以加快操作速度的架构。 但是 WaveNet 成功地将两者结合使用,从而产生了可区分的音频。 第一波 WaveNet 的作者发布了另一篇迭代论文,该论文极大地加速了的产生,称为并行 WaveNet。 但是,在本章中,我们将重点关注普通的 WaveNet,这在很大程度上受到了戈尔宾的资料库的启发[3]。
WaveNet 的基本构件是膨胀卷积,它取代了 RNN 的功能来获取上下文信息。
图 6.8:没有卷积卷积的 WaveNet 架构
来源: 《WaveNet:原始音频的生成模型》,Aaron van den Oord 等
“图 6.8”显示了 WaveNet 在进行新值预测时如何提取有关上下文的信息。 输入以蓝色(图片的底部)给出,它是原始音频样本。 例如,一个 16 kHz 的音频样本具有一秒钟音频的 16,000 个数据点,如果与自然语言的序列长度(每个单词将是一个数据点)相比,这是巨大的。 这些长序列是为什么 RNN 对原始音频样本不太有效的一个很好的原因。
LSTM 网络可以记住上下文信息的实际序列长度为 50 到 100。上图具有三个隐藏层,这些隐藏层使用来自上一层的信息。 第一层输入经过一维卷积层以生成第二层的数据。 卷积可以并行完成,这与 RNN 的情况不同,在卷积中,每个数据点都需要先前的输入顺序地传递。 为了使收集更多上下文,我们可以增加层数。 在“图 6.8”中,位于第四层的输出将从输入层中的五个节点获取上下文信息。 因此,每一层将另外一个输入节点添加到上下文中。 也就是说,如果我们有 10 个隐藏层,则最后一层将从 12 个输入节点获取上下文信息。
图 6.9:膨胀卷积
来源: 《WaveNet:原始音频的生成模型》,Aaron van den Oord 等
到目前为止,应该很明显,要达到 LSTM 网络的上下文保持能力为 50 到 100 的实际限制,该网络需要 98 层,这在计算上是昂贵的。 这是我们使用膨胀卷积的地方。 使用膨胀卷积,我们将为每个层都有一个膨胀因子,并且以指数方式增加该膨胀因子将以对数形式减少任何特定上下文窗口宽度所需的层数。
图 6.10:膨胀为 0、2 和 4 的卷积
资料来源:通过扩散卷积进行的多尺度上下文聚合,Fisher Yu 和 Vladlen Koltun
“图 6.9”显示了 WaveNet 中使用的膨胀卷积方案(尽管为了更好地理解膨胀卷积,我们在这里使用的是二维图片; WaveNet 使用一维卷积)。 尽管该实现方案跳过了中参数的日志,但最终节点仍然可以通过这种巧妙设计的方案从上下文中的所有节点获取信息。 在具有扩张卷积和三个隐藏层的情况下,先前的实现覆盖了 16 个输入节点,而先前没有扩张卷积的实现仅覆盖了五个输入节点。
dilatedcausalconv = torch.nn.Conv1d(
res_channels,
res_channels,
kernel_size=2,
dilation=dilation,
padding=0,
bias=False)
可以用“图 6.10”中给出的二维图片直观地解释膨胀卷积的实现。 所有这三个示例均使用大小为 3x3 的核,其中最左边的块显示的是正常卷积或膨胀卷积,其膨胀因子等于零。 中间块具有相同的核,但膨胀因子为 2,最后一个块的膨胀因子为 4。 扩张卷积的实现技巧是在核之间添加零以扩展核的大小,如图“图 6.11”所示:
图 6.11:带有核扩展的膨胀卷积
PyTorch 通过使用户能够将膨胀作为关键字参数传递,从而使进行膨胀卷积变得容易,如先前代码块中的DilatedCausalConv1d
节点中所给出的。 如前所述,每一层具有不同的扩张因子,并且可以为每一层的扩张卷积节点创建传递该因子。 由于跨步为 1,所以填充保持为 0,目的不是上采样或下采样。 init_weights_for_test
是通过将权重矩阵填充 1 来进行测试的便捷函数。
PyTorch 提供的灵活性使用户可以在线调整参数,这对于调试网络更加有用。 forward
传递仅调用 PyTorch conv1d
对象,该对象是可调用的并保存在self.conv
变量中:
causalconv = torch.nn.Conv1d(
in_channels,
res_channels,
kernel_size=2,
padding=1,
bias=False)
WaveNet 的完整架构建立在膨胀卷积网络和卷积后门控激活的基础之上。 WaveNet 中的数据流从因果卷积运算开始,这是一种正常的一维卷积,然后传递到膨胀的卷积节点。 WaveNet 图片中的每个白色圆圈(“图 6.9”)是一个扩展的卷积节点。 然后,将正常卷积的数据点传递到膨胀的卷积节点,然后将其独立地通过 Sigmoid 门和 tanh 激活。 然后,两个运算的输出通过逐点乘法运算符和1x1
卷积进行。 WaveNet 使用剩余连接和跳跃连接来平滑数据流。 与主流程并行运行的剩余线程通过加法运算与1x1
卷积的输出合并。
图 6.12:WaveNet 架构
来源: 《WaveNet:原始音频的生成模型》,Aaron van den Oord 等
“图 6.12”中提供的 WaveNet 的结构图显示了所有这些小组件以及它们如何连接在一起。 跳跃连接之后的部分在程序中称为密集层,尽管它不是上一章介绍的密集层。 通常,密集层表示全连接层,以将非线性引入网络并获得所有数据的概览。 但是 WaveNet 的作者发现,正常的密集层可以由一串 ReLU 代替,并且1x1
卷积可以通过最后的 softmax 层实现更高的精度,该层可以展开为 256 个单元(巨大扇出的 8 位µ
律量化) 音频)。
class WaveNetModule(torch.nn.Module):
def __init__(self, layer_size, stack_size,
in_channels, res_channels):
super().__init__()
self.causal = CausalConv1d(in_channels, res_channels)
self.res_stack = ResidualStack(layer_size,
stack_size,
res_channels,
in_channels)
self.convdensnet = ConvDensNet(in_channels)
def forward(self, x):
output = self.causal(output)
skip_connections = self.res_stack(output, output_size)
output = torch.sum(skip_connections, dim=0)
output = self.convdensnet(output)
return output.contiguous()
前面的代码块中给出的程序是主要的父 WaveNet 模块,该模块使用所有子组件来创建图。 init
定义了三个主要成分,其中是第一个普通卷积,然后是res_stack
(它是由所有膨胀卷积和 Sigmoid 正切门组成的残差连接块)。 然后,最后的convdensnet
在1x1
卷积的顶部进行。 forward
引入一个求和节点,依次执行这些模块。 然后,将convdensnet
创建的输出通过contiguous()
移动到存储器的单个块。 这是其余网络所必需的。
ResidualStack
是需要更多说明的模块,它是 WaveNet 架构的核心。 ResidualStack
是ResidualBlock
的层的栈。 WaveNet 图片中的每个小圆圈都是一个残差块。 在正常卷积之后,数据到达ResidualBlock
,如前所述。 ResidualBlock
从膨胀的卷积开始,并且期望得到膨胀。 因此,ResidualBlock
决定了架构中每个小圆节点的膨胀因子。 如前所述,膨胀卷积的输出然后通过类似于我们在 PixelCNN 中看到的门的门。
在那之后,它必须经历两个单独的卷积以进行跳跃连接和残差连接。 尽管作者并未将其解释为两个单独的卷积,但使用两个单独的卷积更容易理解。
class ResidualBlock(torch.nn.Module):
def __init__(self, res_channels, skip_channels, dilation=1):
super().__init__()
self.dilatedcausalconv = torch.nn.Conv1d(
res_channels, res_channels, kernel_size=2,
dilation=dilation,
padding=0, bias=False)
self.conv_res = torch.nn.Conv1d(res_channels, res_channels, 1)
self.conv_skip = torch.nn.Conv1d(res_channels, skip_channels, 1)
self.gate_tanh = torch.nn.Tanh()
self.gate_sigmoid = torch.nn.Sigmoid()
def forward(self, x, skip_size):
x = self.dilatedcausalconv(x)
# PixelCNN Gate
# ---------------------------
gated_tanh = self.gate_tanh(x)
gated_sigmoid = self.gate_sigmoid(x)
gated = gated_tanh * gated_sigmoid
# ---------------------------
x = self.conv_res(gated)
x += x[:, :, -x.size(2):]
skip = self.conv_skip(gated)[:, :, -skip_size:]
return x, skip
ResidualStack
使用层数和栈数来创建膨胀因子。 通常,每个层具有2 ^ l
作为膨胀因子,其中l
是层数。 从1
到2 ^ l
开始,每个栈都具有相同数量的层和相同样式的膨胀因子列表。
方法stack_res_block
使用我们前面介绍的ResidualBlock
为每个栈和每个层中的每个节点创建一个残差块。 该程序引入了一个新的 PyTorch API,称为torch.nn.DataParallel
。 如果有多个 GPU,则DataParallel
API 会引入并行性。 将模型制作为数据并行模型可以使 PyTorch 知道用户可以使用更多 GPU,并且 PyTorch 从那里获取了它,而没有给用户带来任何障碍。 PyTorch 将数据划分为尽可能多的 GPU,并在每个 GPU 中并行执行模型。
它还负责从每个 GPU 收集回结果,并将其合并在一起,然后再继续进行。
class ResidualStack(torch.nn.Module):
def __init__(self, layer_size, stack_size, res_channels,
skip_channels):
super().__init__()
self.res_blocks = torch.nn.ModuleList()
for s in range(stack_size):
for l in range(layer_size):
dilation = 2 ** l
block = ResidualBlock(res_channels, skip_channels,
dilation)
self.res_blocks.append(block)
def forward(self, x, skip_size):
skip_connections = []
for res_block in self.res_blocks:
x, skip = res_block(x, skip_size)
skip_connections.append(skip)
return torch.stack(skip_connections)
在许多深度学习研究人员看来,GAN 是过去十年的主要发明之一。 它们在本质上不同于其他生成网络,尤其是在训练方式上。 Ian Goodfellow 撰写的第一篇有关对抗网络生成数据的论文于 2014 年发表。 GAN 被认为是一种无监督学习算法,其中有监督学习算法学习使用标记数据y
来推理函数y' = f(x)
。
这种监督学习算法本质上是判别式的,这意味着它学会对条件概率分布函数进行建模,在此条件函数中,它说明了某事物的概率被赋予了另一事物的状态。 例如,如果购买房屋的价格为 100,000 美元,那么房屋位置的概率是多少? GAN 从随机分布生成输出,因此随机输入的变化使输出不同。
GAN 从随机分布中获取样本,然后由网络将其转换为输出。 GAN 在学习输入分布的模式时不受监督,并且与其他生成网络不同,GAN 不会尝试明确地学习密度分布。 相反,他们使用博弈论方法来找到两个参与者之间的纳什均衡。 GAN 实现将始终拥有一个生成网络和一个对抗网络,这被视为两个试图击败的参与者。 GAN 的核心思想在于从统一或高斯等数据分布中采样,然后让网络将采样转换为真正的数据分布样。 我们将实现一个简单的 GAN,以了解 GAN 的工作原理,然后转向名为 CycleGAN 的高级 GAN 实现。
简单的 GAN
了解 GAN 的直观方法是从博弈论的角度了解它。 简而言之,GAN 由两个参与者组成,一个生成器和一个判别器,每一个都试图击败对方。 生成器从分布中获取一些随机噪声,并尝试从中生成一些输出分布。 生成器总是尝试创建与真实分布没有区别的分布; 也就是说,伪造的输出应该看起来像是真实的图像。
Figure 6.13: GAN architecture
但是,如果没有明确的训练或标签,生成器将无法确定真实图像的外观,并且其唯一的来源就是随机浮点数的张量。 然后,GAN 将第二个玩家介绍给游戏,这是一个判别器。 判别器仅负责通知生成器生成的输出看起来不像真实图像,以便生成器更改其生成图像的方式以使判别器确信它是真实图像。 但是判别器总是可以告诉生成器图像不是真实的,因为判别器知道图像是从生成器生成的。 这就是事情变得有趣的地方。 GAN 将真实,真实的图像引入游戏中,并将判别器与生成器隔离。 现在,判别器从一组真实图像中获取一个图像,并从生成器中获取一个伪图像,并且判别器必须找出每个图像的来源。 最初,判别器什么都不知道,只能预测随机结果。
class DiscriminatorNet(torch.nn.Module):
A three hidden-layer discriminative neural network
def __init__(self):
super().__init__()
n_features = 784
n_out = 1
self.hidden0 = nn.Sequential(
nn.Linear(n_features, 1024),
nn.LeakyReLU(0.2),
nn.Dropout(0.3)
self.hidden1 = nn.Sequential(
nn.Linear(1024, 512),
nn.LeakyReLU(0.2),
nn.Dropout(0.3)
self.hidden2 = nn.Sequential(
nn.Linear(512, 256),
nn.LeakyReLU(0.2),
nn.Dropout(0.3)
self.out = nn.Sequential(
torch.nn.Linear(256, n_out),
torch.nn.Sigmoid()
def forward(self, x):
x = self.hidden0(x)
x = self.hidden1(x)
x = self.hidden2(x)
x = self.out(x)
return x
但是,可以将辨别器的任务修改为分类任务。 判别器可以将输入图像分类为原始或生成的,这是二分类。 同样,我们训练判别器网络正确地对图像进行分类,最终,通过反向传播,判别器学会了区分真实图像和生成的图像。
该会话中使用的示例将生成类似 MNIST 的输出。 前面的代码显示了 MNIST 上的鉴别播放器,该播放器总是从真实源数据集或生成器中获取图像。 GAN 众所周知非常不稳定,因此使用LeakyReLU
是研究人员发现比常规ReLU
更好工作的黑客之一。 现在,LeakyReLU
通过它泄漏了负极,而不是将所有内容限制为零到零。 与正常的ReLU
相比,这有助于使梯度更好地流过网络,对于小于零的值,梯度为零。
图 6.14:ReLU 和泄漏的 ReLU
我们开发的的简单判别器具有三个连续层。 每个层都有一个线性层,泄漏的 ReLU 和一个夹在中间的漏失层,然后是一个线性层和一个 Sigmoid 门。 通常,概率预测网络使用 softmax 层作为最后一层; 像这样的简单 GAN 最适合 Sigmoid 曲面。
def train_discriminator(optimizer, real_data, fake_data):
optimizer.zero_grad()
# 1.1 Train on Real Data
prediction_real = discriminator(real_data)
# Calculate error and backpropagate
error_real = loss(prediction_real,real_data_target(real_data.size(0)))
error_real.backward()
# 1.2 Train on Fake Data
prediction_fake = discriminator(fake_data)
# Calculate error and backpropagate
error_fake = loss(prediction_fake,fake_data_target(real_data.size(0)))
error_fake.backward()
# 1.3 Update weights with gradients
optimizer.step()
# Return error
return error_real + error_fake, prediction_real, prediction_fake
在前面的代码块中定义的函数train_generator
接受optimizer
对象,伪数据和实数据,然后将它们传递给判别器。 函数fake_data_target
(在下面的代码块中提供)创建一个零张量,该张量的大小与预测大小相同,其中预测是从判别器返回的值。 判别器的训练策略是使任何真实数据被归类为真实分布的概率最大化,并使任何数据点被归类为真实分布的概率最小化。 在实践中,使用了来自判别器或生成器的结果的日志,因为这会严重损害网络的分类错误。 然后在应用optimizer.step
函数之前将误差反向传播,该函数将通过学习率以梯度更新权重。
接下来给出用于获得真实数据目标和伪数据目标的函数,这与前面讨论的最小化或最大化概率的概念基本一致。 实际数据生成器返回一个张量为 1s 的张量,该张量是我们作为输入传递的形状。 在训练生成器时,我们正在尝试通过生成图像来最大程度地提高其概率,该图像看起来应该是从真实数据分布中获取的。 这意味着判别器应将 1 预测为图像来自真实分布的置信度分数。
def real_data_target(size):
Tensor containing ones, with shape = size
return torch.ones(size, 1).to(device)
def fake_data_target(size):
Tensor containing zeros, with shape = size
return torch.zeros(size, 1).to(device)
因此,判别器的实现很容易实现,因为它本质上只是分类任务。 生成器网络将涉及所有卷积上采样/下采样,因此有点复杂。 但是对于当前示例,由于我们希望它尽可能简单,因此我们将在全连接网络而不是卷积网络上进行工作。
def noise(size):
n = torch.randn(size, 100)
return n.to(device)
可以定义一个噪声生成函数,该函数可以生成随机样本(事实证明,这种采样在高斯分布而非随机分布下是有效的,但为简单起见,此处使用随机分布)。 如果 CUDA 可用,我们会将随机产生的噪声从 CPU 内存传输到 GPU 内存,并返回张量,其输出大小为100
。 因此,生成网络期望输入噪声的特征数量为 100,而我们知道 MNIST 数据集中有 784 个数据点(28x28
)。
对于生成器,我们具有与判别器类似的结构,但是在最后一层具有 tanh 层,而不是 Sigmoid。 进行此更改是为了与我们对 MNIST 数据进行的归一化同步,以将其转换为 -1 到 1 的范围,以便判别器始终获得具有相同范围内数据点的数据集。 生成器中的三层中的每一层都将输入噪声上采样到 784 的输出大小,就像我们在判别器中下采样以进行分类一样。
class GeneratorNet(torch.nn.Module):
A three hidden-layer generative neural network
def __init__(self):
super().__init__()
n_features = 100
n_out = 784
self.hidden0 = nn.Sequential(
nn.Linear(n_features, 256),
nn.LeakyReLU(0.2)
self.hidden1 = nn.Sequential(
nn.Linear(256, 512),
nn.LeakyReLU(0.2)
self.hidden2 = nn.Sequential(
nn.Linear(512, 1024),
nn.LeakyReLU(0.2)
self.out = nn.Sequential(
nn.Linear(1024, n_out),
nn.Tanh()
def forward(self, x):
x = self.hidden0(x)
x = self.hidden1(x)
x = self.hidden2(x)
x = self.out(x)
return x
生成器训练器函数比判别器训练器函数简单得多,因为它不需要从两个来源获取输入,也不必针对不同的目的进行训练,而判别器则必须最大化将真实图像分类为真实图像的可能性。 图像,并最小化将噪声图像分类为真实图像的可能性。 此函数仅接受伪图像数据和优化器,其中伪图像是生成器生成的图像。 生成器训练器函数代码可以在 GitHub 存储库中找到。
我们分别创建判别器和生成器网络的实例。 到目前为止,我们所有的网络实现都具有单个模型或单个神经网络,但第一次,我们有两个单独的网络在同一个数据集上工作,并具有不同的优化目标。 对于两个单独的网络,我们还需要创建两个单独的优化器。 从历史上看,Adam
优化器最适合学习速度非常慢的 GAN。
两个网络都使用判别器的输出进行训练。 唯一的区别是,在训练判别器时,我们尝试使伪造图像被分类为真实图像的可能性最小,而在训练生成器时,我们试图使伪造图像被分类为真实图像的可能性最大。 由于它始终是试图预测 0 和 1 的二分类器,因此我们使用torch.nn
中的BCELoss
来尝试预测 0 或 1:
discriminator = DiscriminatorNet().to(device)
generator = GeneratorNet().to(device)
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.0002)
g_optimizer = optim.Adam(generator.parameters(), lr=0.0002)
loss = nn.BCELoss()
接下来是简单 GAN 在不同周期生成的输出,该图显示了网络如何学会将输入随机分布映射到输出真实分布。
图 6.15:100 个周期后的输出
图 6.16:200 个周期后的输出
图 6.17:300 个周期后的输出
CycleGAN
图 6.18:实践中的 CycleGAN
资料来源:《使用周期一致的对抗性网络的不成对图像翻译》,朱俊彦等
CycleGAN 是 GAN 类型的智能变体之一。 在同一架构中,两个 GAN 之间巧妙设计的循环流可教导两个不同分布之间的映射。 先前的方法需要来自不同分布的成对图像,以便网络学习映射。 对于示例,如果目标是建立一个可以将黑白图像转换为彩色图像的网络,则数据集在训练集中需要将同一图像的黑白和彩色版本作为一对。 尽管很难,但在一定程度上这是可能的。 但是,如果要使冬天拍摄的图像看起来像夏天拍摄的图像,则训练集中的这对图像必须是在冬天和夏天拍摄的具有相同对象和相同帧的完全相同的图像。 这是完全不可能的,而那正是 CycleGAN 可以提供帮助的地方。
CycleGAN 学习每种分布的模式,并尝试将图像从一种分布映射到另一种分布。 “图 6.19”中给出了 CycleGAN 的简单架构图。 上面的图显示了如何训练一个 GAN,下面的图显示了如何使用正在工作的 CycleGAN 典型示例:马和斑马来训练另一个。
在 CycleGAN 中,我们不是从分布中随机采样的数据开始,而是使用来自集合 A(在本例中为一组马)的真实图像。 委托生成器 A 到 B(我们称为 A2B)将同一匹马转换为斑马,但没有将成对的马匹转换为斑马的配对图像。 训练开始时,A2B 会生成无意义的图像。 判别器 B 从 A2B 生成的图像或从集合 B(斑马的集合)中获取真实图像。 与其他任何判别器一样,它负责预测图像是生成的还是真实的。 这个过程是正常的 GAN,它永远不能保证同一匹马转换为斑马。 而是将马的图像转换为斑马的任何图像,因为损失只是为了确保图像看起来像集合 B 的分布; 它不需要与集合 A 相关。为了强加这种相关性,CycleGAN 引入了循环。
然后,从 A2B 生成的图像会通过另一个生成器 B2A,以获得Cyclic_A
。 施加到Cyclic_A
的损失是 CycleGAN 的关键部分。 在这里,我们尝试减小Cyclic_A
和Input_A
之间的距离。 第二个损失背后的想法是,第二个生成器必须能够生成马,因为我们开始时的分布是马。 如果 A2B 知道如何将马匹映射到斑马而不改变图片中的任何其他内容,并且如果 B2A 知道如何将斑马线映射到匹马而不改变图片中的其他任何东西,那么我们对损失所做的假设应该是正确的。
图 6.19:CycleGAN 架构
当判别器 A 获得马的真实图像时,判别器 B 从 A2B 获得斑马的生成图像,当判别器 B 获得斑马的真实图像时,判别器 A 从 B2A 获得马的生成图像。 要注意的一点是,判别器 A 总是能够预测图像是否来自马具,而判别器 B 总是能够预测图像是否来自斑马具。 同样,A2B 始终负责将马集合映射到斑马分布,而 B2A 始终负责将斑马集合映射到马分布。
生成器和判别器的这种周期性训练可确保网络学会使用模式变化来映射图像,但图像的所有其他特征均保持不变。
Generator(
(model): Sequential(
(0): ReflectionPad2d((3, 3, 3, 3))
(1): Conv2d(3, 64, kernel_size=(7, 7), stride=(1, 1))
(2): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False,track_running_stats=False)
(3): ReLU(inplace)
(4): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2),padding=(1, 1))
(5): InstanceNorm2d(128, eps=1e-05, momentum=0.1,affine=False, track_running_stats=False)
(6): ReLU(inplace)
(7): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2),padding=(1, 1))
(8): InstanceNorm2d(256, eps=1e-05, momentum=0.1,affine=False, track_running_stats=False)
(9): ReLU(inplace)
(10): ResidualBlock()
(11): ResidualBlock()
(12): ResidualBlock()
(13): ResidualBlock()
(14): ResidualBlock()
(15): ResidualBlock()
(16): ResidualBlock()
(17): ResidualBlock()
(18): ResidualBlock()
(19): ConvTranspose2d(256, 128, kernel_size=(3, 3), stride=(2,2), padding=(1, 1), output_padding=(1, 1))
(20): InstanceNorm2d(128, eps=1e-05, momentum=0.1,affine=False, track_running_stats=False)
(21): ReLU(inplace)
(22): ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2,2), padding=(1, 1), output_padding=(1, 1))
(23): InstanceNorm2d(64, eps=1e-05, momentum=0.1,affine=False, track_running_stats=False)
(24): ReLU(inplace)
(25): ReflectionPad2d((3, 3, 3, 3))
(26): Conv2d(64, 3, kernel_size=(7, 7), stride=(1, 1))
(27): Tanh()
PyTorch 为用户提供了进入网络并进行操作的完全灵活性。 其中一部分是将模型打印到终端上,以显示其中包含所有模块的地形排序图。
之前我们在 CycleGAN 中看到了生成器的图。 与我们探讨的第一个简单 GAN 不同,A2B 和 B2A 都具有相同的内部结构,内部具有卷积。 整个生成器都包装在以ReflectionPad2D
开头的单个序列模块中。
反射填充涉及填充输入的边界,跳过批量尺寸和通道尺寸。 填充之后是典型的卷积模块布置,即二维卷积。
实例归一化分别对每个输出批量进行归一化,而不是像“批量归一化”中那样对整个集合进行归一化。 二维实例归一化确实在 4D 输入上实例化归一化,且批量尺寸和通道尺寸为第一维和第二维。 PyTorch 通过传递affine=True
允许实例规范化层可训练。 参数track_running_stats
决定是否存储训练循环的运行平均值和方差,以用于评估模式(例如归一化)。 默认情况下,它设置为False
; 也就是说,它在训练和评估模式下都使用从输入中收集的统计信息。
下图给出了批量规范化和实例规范化的直观比较。 在图像中,数据表示为三维张量,其中C
是通道,N
是批量,D
是其他维,为简单起见,在一个维中表示。 如图中所示,批量归一化对整个批量中的数据进行归一化,而实例归一化则在两个维度上对一个数据实例进行归一化,从而使批量之间的差异保持完整。
图 6.20:
Source: Group Normalization, Yuxin Wu and Kaiming He
原始 CycleGAN 的生成器在三个卷积块之后使用九个残差块,其中每个卷积块由卷积层,归一化层和激活层组成。 残差块之后是几个转置卷积,然后是最后一层具有 tanh 函数的一个卷积层。 如简单 GAN 中所述,tanh 输出的范围是 -1 至 1,这是所有图像的归一化值范围。
残余块的内部是按顺序排列的另一组填充,卷积,归一化和激活单元。 但是forward
方法与residueNet
中的求和操作建立了残余连接。 在以下示例中,所有内部块的顺序包装都保存到变量conv_block
中。 然后,将经过此块的数据与加法运算符一起输入到网络x
。 此残留连接通过允许信息更容易地双向流动来帮助网络变得稳定:
class ResidualBlock(nn.Module):
def __init__(self, in_features):
super().__init__()
conv_block = [nn.ReflectionPad2d(1),
nn.Conv2d(in_features, in_features, 3),
nn.InstanceNorm2d(in_features),
nn.ReLU(inplace=True),
nn.ReflectionPad2d(1),
nn.Conv2d(in_features, in_features, 3),
nn.InstanceNorm2d(in_features)]
self.conv_block = nn.Sequential(*conv_block)
def forward(self, x):
return x + self.conv_block(x)
在本章中,我们学习了一系列全新的神经网络,这些神经网络使人工智能世界发生了翻天覆地的变化。 生成网络对我们始终很重要,但是直到最近我们才能达到人类无法比拟的准确率。 尽管有一些成功的生成网络架构,但在本章中我们仅讨论了两个最受欢迎的网络。
生成网络使用 CNN 或 RNN 之类的基本架构作为整个网络的构建块,但是使用一些不错的技术来确保网络正在学习生成一些输出。 到目前为止,生成网络已在艺术中得到广泛使用,并且由于模型必须学习数据分布以生成输出,因此我们可以轻松地预测生成网络将成为许多复杂网络的基础。 生成网络最有前途的用途可能不是生成,而是通过生成学习数据分发并将该信息用于其他目的。
在下一章中,我们将研究最受关注的网络:强化学习算法。
《使用 PixelCNN 解码器的条件图像生成》,Oord,Aäronvan den,Nal Kalchbrenner,Oriol Vinyals,Lasse Espeholt,Alex Graves 和 Koray Kavukcuoglu,NIPS,2016 年
《并行 WaveNet:快速高保真语音合成》,Oord,Aäronvan den,Yazhe Li,Igor Babuschkin,Karen Simonyan,Oriol Vinyals,Koray Kavukcuoglu,George van den Driessche,Edward Lockhart,Luis C. Cobo, Florian Stimberg,Norman Casagrande,Dominik Grewe,Seb Noury,Sander Dieleman,Erich Elsen,Nal Kalchbrenner,Heiga Zen,Alex Graves,Helen King,Tom Walters,Dan Belov 和 Demis Hassabis,ICML,2018
戈尔宾的 WaveNet 存储库
七、强化学习
让我们谈谈学习的本质。 我们不是天生就知道这个世界。 通过与世界互动,我们了解了行动的效果。 一旦我们了解了世界的运转方式,我们就可以利用这些知识来做出可以将我们引向特定目标的决策。
在本章中,我们将使用一种称为强化学习的方法来制定这种计算学习方法。 它与本书中介绍的其他类型的深度学习算法非常不同,并且本身就是一个广阔的领域。
强化学习的应用范围从在数字环境中玩游戏到在现实环境中控制机器人的动作。 它也恰好是您用来训练狗和其他动物的技术。 如今,强化学习已被用于驾驶自动驾驶汽车,这是一个非常受欢迎的领域。
当计算机(AlphaGo)击败世界围棋冠军 Lee Sedol [1]时,发生了最近的重大突破之一。 这是一个突破,因为围棋一直以来被认为是让计算机掌握很长时间的游戏圣杯。 这是因为据说围棋游戏中的配置数量大于我们宇宙中的原子数量。
在世界冠军输给 AlphaGo 之后,甚至有人说他已经从计算机中学到了一些东西。 这听起来很疯狂,但这是事实。 听起来更疯狂的是,算法的输入只不过是棋盘游戏当前状态的图像,而 AlphaGo 则一遍又一遍地对自己进行训练。 但在此之前,它从观看世界冠军的视频中学习了数小时。
如今,强化学习已被用于使机器人学习如何走路。 在这种情况下,输入将是机器人可以施加到其关节的力以及机器人将要行走的地面状态。 强化学习也被用于预测股价,并且在该领域引起了很多关注。
这些现实问题似乎非常复杂。 我们将需要对所有这些事情进行数学公式化,以便计算机可以解决它们。 为此,我们需要简化环境和决策过程以实现特定目标。
在强化学习的整个范式中,我们仅关注从交互中学习,而学习器或决策者则被视为智能体。 在自动驾驶汽车中,智能体是汽车,而在乒乓球中,智能体是球拍。 当智能体最初进入世界时,它将对世界一无所知。 智能体将必须观察其环境并根据其做出决策或采取行动。 它从环境中返回的响应称为奖励,可以是肯定的也可以是否定的。 最初,智能体将随机采取行动,直到获得正面奖励为止,并告诉他们这些决定可能对其有利。
这似乎很简单,因为智能体程序要做的就是考虑环境的当前状态进行决策,但是我们还想要更多。 通常,座席的目标是在其一生中最大化其累积奖励,重点是“累积”一词。 智能体不仅关心在下一步中获得的报酬,而且还关心将来可能获得的报酬。 这需要有远见,并将使智能体学习得更好。
这个元素使问题变得更加复杂,因为我们必须权衡两个因素:探索与利用。 探索将意味着做出随机决策并对其进行测试,而利用则意味着做出智能体已经知道的决策将给其带来积极的结果,因此智能体现在需要找到一种方法来平衡这两个因素以获得最大的累积结果。 。 这是强化学习中非常重要的概念。 这个概念催生了各种算法来平衡这两个因素,并且是一个广泛的研究领域。
在本章中,我们将使用 OpenAI 名为 Gym 的库。 这是一个开放源代码库,为强化学习算法的训练和基准测试设定了标准。 体育馆提供了许多研究人员用来训练强化学习算法的环境。 它包括许多 Atari 游戏,用于拾取物品的机器人仿真,用于步行和跑步的各种机器人仿真以及驾驶仿真。 该库提供了智能体程序和环境之间相互交互所必需的参数。
现在,我们已经准备好用数学公式来表达强化学习问题,因此让我们开始吧。
图 7.1:强化学习框架
在上图中,您可以看到任何强化学习问题的设置。 通常,强化学习问题的特征在于,智能体试图学习有关其环境的信息,如前所述。
假设时间以不连续的时间步长演化,则在时间步长 0 处,智能体查看环境。 您可以将这种观察视为环境呈现给智能体的情况。 这也称为观察环境状态。 然后,智能体必须为该特定状态选择适当的操作。 接下来,环境根据智能体采取的行动向智能体提出了新的情况。 在同一时间步长中,环境会给智能体提供奖励,从而可以指示智能体是否做出了适当的响应。 然后该过程继续。 环境为坐席提供状态和奖励,然后坐席采取行动。
图 7.2:每个时间步骤都有一个状态,动作和奖励
因此,状态,动作和奖励的顺序现在随着时间而流动,在这个过程中,对智能体而言最重要的是其奖励。 话虽如此,智能体的目标是使累积奖励最大化。 换句话说,智能体需要制定一项策略,以帮助其采取使累积奖励最大化的行动。 这只能通过与环境交互来完成。
这是因为环境决定了对每个动作给予智能体多少奖励。 为了用数学公式表述,我们需要指定状态,动作和奖励,以及环境规则。
情景任务与连续任务
在现实世界中,我们指定的许多任务都有明确定义的终点。 例如,如果智能体正在玩游戏,则当智能体获胜或失败或死亡时,剧集或任务便会结束。
在无人驾驶汽车的情况下,任务在汽车到达目的地或撞车时结束。 这些具有明确终点的任务称为剧集任务。 智能体在每个剧集的结尾都会获得奖励,这是智能体决定自己在环境中做得如何的时候。 然后,智能体从头开始但继续拥有下一个剧集的先验信息,然后继续执行下一个剧集,因此效果更好。
随着时间的流逝,在一段剧集中,智能体将学会玩游戏或将汽车开到特定的目的地,因此将受到训练。 您会记得,智能体的目标是在剧集结束时最大限度地提高累积奖励。
但是,有些任务可能永远持续下去。 例如,在股票市场上交易股票的机器人没有明确的终点,必须在每个时间步骤中学习和提高自己。 这些任务称为连续任务。 因此,在那种情况下,奖励是在特定的时间间隔提供给业务代表的,但任务没有尽头,因此业务代表必须从环境中学习并同时进行预测。
在本章中,我们将只关注情景任务,但为连续任务制定问题陈述并不会有太大不同。
累积折扣奖励
为了使智能体最大化累积奖励,可以考虑的一种方法是在每个时间步长上最大化奖励。 这样做可能会产生负面影响,因为在初始时间步长中最大化回报可能会导致智能体在将来很快失败。 让我们以步行机器人为例。 假定机器人的速度是奖励的一个因素,如果机器人在每个时间步长上都最大化其速度,则可能会使其不稳定并使其更快落下。
我们正在训练机器人走路; 因此,我们可以得出结论,智能体不能仅仅专注于当前时间步长来最大化报酬。 它需要考虑所有时间步骤。 所有强化学习问题都会是这种情况。 动作可能具有短期或长期影响,智能体需要了解动作的复杂性以及环境带来的影响。
在前述情况下,如果智能体将了解到其移动速度不能超过某个可能会使它不稳定并对其产生长期影响的极限,则它将自行学习阈值速度。 因此,智能体将在每个时间步长处获得较低的报酬,但会避免将来跌倒,从而使累积报酬最大化。
假设在所有未来时间步长处的奖励都由R[t]
,R[t + 1]
,R[t + 2]
表示,依此类推:
由于这些时间步伐是在将来,智能体无法确定地知道将来的回报是什么。 它只能估计或预测它们。 未来奖励的总和也称为回报。 我们可以更明确地指定智能体的目标是使期望收益最大化。
让我们还考虑一下,未来回报中的所有回报并不那么重要。 为了说明这一点,假设您想训练一只狗。 您给它命令,如果它正确地遵循了它们,则给它一种奖赏。 您能期望狗像称重从现在起数年可能获得的奖励一样,来权衡明天可能获得的奖励吗? 这似乎不可行。
为了让狗决定现在需要采取什么行动,它需要更加重视可能早日获得的奖励,而不再重视可能会从现在开始获得的奖励。 这也被认为是合乎逻辑的,因为狗不确定未来的把握,特别是当狗仍在学习环境并改变其从环境中获得最大回报的策略时。 因为与未来成千上万步长的奖励相比,未来数个时间步长的奖励更可预测,所以折扣收益的概念应运而生。
可以看到,我们在Goal
方程中引入了可变伽玛。 接近 1 的Gamma
表示您将来对每个奖励的重视程度相同。 接近 0 的Gamma
表示只有最近的奖励才具有很高的权重。
一个良好的做法是将Gamma = 0.9
,因为您希望智能体对未来有足够的关注,但又不是无限远。 您可以在训练时设置Gamma
,并且Gamma
会保持固定,直到实验结束。 重要的是要注意,折扣在连续任务中非常有用,因为它们没有尽头。 但是,继续执行的任务不在本章范围之内。
马尔可夫决策过程
让我们通过学习称为马尔可夫决策过程(MDP)的数学框架来完成对强化学习问题的定义。
MDP 定义有五件事:
有限状态集
有限动作集
有限奖励集
环境的单步动态
我们已经了解了如何指定状态,操作,奖励和折扣率。 让我们找出如何指定环境的一步式动态。
下图描述了垃圾收集机器人的 MDP。 机器人的目标是收集垃圾桶。 机器人将继续寻找垃圾桶,并不断收集垃圾桶,直到电池用完,然后再回到扩展坞为电池充电。 可以将机器人的状态定义为高和低,以表示其电池电量。 机器人可以执行的一组操作是搜索垃圾桶,在自己的位置等待,然后返回对接站为电池充电。
图 7.3:垃圾收集机器人的 MDP
例如,假设机器人处于高电量状态。 如果决定搜索垃圾桶,则状态保持高状态的概率为 70%,状态变为低状态的概率为 30%,每种状态获得的奖励为 4。
同样,如果电池处于高电量状态,则决定在其当前位置等待,电池处于高电量状态的可能性为 100%,但是获得的奖励也很低。
花一点时间浏览所有动作和状态,以更好地了解它们。 通过详细说明智能体可以处于的所有状态以及智能体在其所有状态下可以执行的所有操作,并确定每个操作的概率,可以指定环境。 一旦指定了所有这些,就可以指定环境的一站式动态。
在任何 MDP 中,智能体都会知道状态,操作和折扣率,而不会知道环境的回报和一步动态。
现在,您了解了制定任何实际问题(通过强化学习解决)的所有知识。
既然我们已经学习了如何使用 MDP 来指定问题,那么智能体需要制定解决方案。 此策略也可以称为策略。
策略和值函数
策略定义学习智能体在给定时间的行为方式。 保单用希腊字母Pi
表示。 该策略不能用公式定义; 它更多是基于直觉的概念。
让我们举个例子。 对于需要在房间外寻找出路的机器人,它可能具有以下策略:
沿着墙壁走
找到通往门的最短路径
为了使我们能够数学地预测在特定状态下要采取的行动,我们需要一个函数。 让我们定义一个函数,该函数将设为当前状态,并输出一个数字,该数字表示该状态的值。例如,如果您要越过河流,那么靠近桥梁的位置的值将比远离目标位置更大。 此函数称为值函数,也用V
表示。
我们可以使用另一个函数来帮助我们度量事物:一个函数,该函数为我们提供由所有可以采取的行动所导致的所有未来状态的值。
图 7.4:MDP 中的状态和动作
让我们举个例子。 让我们考虑通用状态S0
。 现在我们需要预测在a1
,a2
和a3
之间要采取什么行动才能获得最大的回报(累积折扣奖励)。 我们将此函数命名为Q
。 我们的函数Q
,将预测每个操作的预期收益(值(V
))。 此Q
函数也称为动作值函数,因为它考虑了状态和动作,并预测了它们各自的组合的预期收益。
我们通常会选择最大值。 因此,这些最高限额将指导智能体到最后,这将是我们的策略。 请注意,我大部分时间都在说。 通常,在选择非最大动作值对时,我们会保持很小的随机机会。 我们这样做是为了提高模型的可探索性。 该随机探索机会的百分比称为ε
,该策略称为 ε 贪婪策略。 这是人们用来解决强化学习问题的最常见策略。 如果我们一直都只选择最大值,而不进行任何探索,则该策略简称为贪婪策略。 我们将在实现过程中同时使用这两种策略。
但是起初,我们可能不知道最佳作用值函数。 因此,由此产生的策略也将不是最佳策略。 我们将需要遍历动作值函数,并找到提供最佳回报的函数。 一旦找到它,我们将获得最优的Q
。 最佳Q
也称为Q*
。 因此,我们将能够找到最优的Pi
,也称为Pi*
。
此Q
函数是智能体必须学习的函数。 我们将使用神经网络来学习此函数,因为神经网络也是通用函数逼近器。 一旦有了行动值函数,座席就可以了解问题的最佳策略,我们就可以完成目标。
贝尔曼方程
如果我们使用最近定义的 Q 函数重新定义目标方程,则可以编写:
现在让我们递归定义相同的方程式。 我们将提出贝尔曼方程:
简而言之,Bellman 等式指出,每个点的收益等于下一时间步长的估计报酬加上随后状态的折扣报酬。 可以肯定地说,某些策略的任何值函数都遵循贝尔曼方程。
寻找最佳 Q 函数
现在我们知道,如果我们具有最优 Q 函数,则可以通过选择收益最高的操作来找到最优策略。
深度 Q 学习
深度 Q 学习算法使用神经网络来解决 Q 学习问题。 它对于连续空间的强化学习问题非常有效。 也就是说,任务不会结束。
前面我们讨论了值函数(V
)和操作值函数(Q
)。 由于神经网络是通用函数逼近器,因此我们可以假设它们中的任何一个都是神经网络,具有可以训练的权重。
因此,值函数现在将接受网络的状态和权重,并输出当前状态的值。 我们将需要计算某种误差并将其反向传播到网络,然后使用梯度下降进行训练。 我们需要将网络的输出(值函数)与我们认为最佳的值进行比较。
根据贝尔曼方程:
我们可以通过考虑下一个状态的值来计算预期的Q
。 我们可以通过考虑到目前为止的累积奖励来计算当前的Q
。 在这些 Q 函数之间的差上使用均方误差(MSE)可能是我们的损失。 研究人员建议的一项改进是,当误差较大时,使用平均绝对误差代替 MSE。 当 Q 函数的估计值非常嘈杂时,这使它对异常值更加健壮。 这种损失称为胡贝尔损失。
我们的代码的训练循环如下所示:
随机初始化w, π <- ε
对于所有剧集:
虽然S
并非在每个时间步都是终端:
使用π, Q
从S
中选择A
观察R
和S'
S <- S'
这里要注意的一件事是,我们将使用相同的 ε 贪婪策略在“步骤 6”中选择动作,并在“步骤 8”中更新相同的策略。 这种算法称为策略上算法。 从某种意义上讲,这是很好的,因为在我们观察和更新同一策略时,将更快地学习该策略。 它收敛非常快。 它也有一些缺点,即所学习的策略和用于决策的策略彼此紧密地联系在一起。 如果我们想要一个更具探索性的策略,以便在“步骤 6”中选择观察结果,并在“步骤 8”中更新更优化的策略,该怎么办? 这样的算法被称为非策略算法。
Q 学习是一种非策略算法,因此,在 Q 学习中,我们将有两个策略。 我们用来推断动作的策略将是 ε 贪婪策略,并且我们将其称为策略网络。 我们将使用更新步骤更新的网络将是我们的目标网络。 那只能由一个贪婪的策略来控制,这意味着我们将始终选择ε
等于零的最大值。 我们不会对此策略采取随机措施。 我们这样做是为了使我们更快地朝着更高的值前进。 我们将通过不时复制策略网的权重(例如每隔一集一次)来更新目标网的权重。
其背后的想法是不追逐一个移动的目标。 让我们举个例子:假设您想训练一头驴走路。 如果您坐在驴上并在其嘴前悬挂胡萝卜,驴可能会向前走,胡萝卜仍与驴保持相同的距离。 但是,与普遍的看法相反,这并不那么有效。 胡萝卜可能会随机反弹,并可能使驴远离其路径。 取而代之的是,通过从驴上下来并站在要驴来的地方使驴和胡萝卜脱钩,这似乎是一个更好的选择。 它提供了一个更稳定的学习环境。
我们可以对算法进行的另一项改进是添加有限的经验和已保存交易记录。 每笔交易都包含学习某些东西所需的所有相关信息。 它是状态,执行的动作,随后的下一个状态以及对该动作给予的奖励的元组。
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
我们将随机采样一些经验或交易,并在优化模型时向他们学习。
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
memory = ReplayMemory(10000)
在这里,我们为交易定义了一个存储库。 有一个称为push
的函数可将事务推送到内存中。 还有另一个函数可以从内存中随机采样。
我们将使用 OpenAI 的 Gym 从环境env
中获取参数。 环境变量很多,例如智能体的速度和位置。 我们将训练一个平衡点来平衡自己。
图 7.5:卡特彼勒平衡环境
图 7.6:Gym 暴露的环境变量
在环境中的每个观察值或状态在 Cartpole 环境(env
)中都有四个值。 上面的屏幕快照来自于 Cartpole 环境的 Gym 代码。 每个观测值在尖端都有位置,速度,极角和极速度。 您可以采取的行动是向左或向右移动。
env = gym.make('CartPole-v0').unwrapped
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
screen_width = 600
def get_screen():
screen = env.render(mode='rgb_array').transpose((2, 0, 1)) # transpose into torch order (CHW)
screen = screen[:, 160:320] # Strip off the top and bottom of the screen
# Get cart location
world_width = env.x_threshold * 2
scale = screen_width / world_width
cart_location = int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART
# Decide how much to strip
view_width = 320
if cart_location < view_width // 2:
slice_range = slice(view_width)
elif cart_location > (screen_width - view_width // 2):
slice_range = slice(-view_width, None)
else:
slice_range = slice(cart_location - view_width // 2,
cart_location + view_width // 2)
# Strip off the edges, so that we have a square image centered on a cart
screen = screen[:, :, slice_range]
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
resize = T.Compose([T.ToPILImage(),
T.Resize(40, interpolation=Image.CUBIC),
T.ToTensor()])
return resize(screen).unsqueeze(0).to(device) # Resize, and add a batch dimension (BCHW)
在这里,我们定义了get_screen
函数。 柱状环境渲染并返回一个屏幕(3D 像素数组)。 我们将要剪裁一个正方形的图像,其中心是小刀。 我们从env.state[0]
获得了位置。 根据文档,第一个参数是推车位置。 然后我们去掉顶部,底部,左侧和右侧,以使小柱位于中心。 接下来,我们将其转换为张量,进行一些转换,添加另一个尺寸,然后返回图像。
class DQN(nn.Module):
def __init__(self):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
self.bn3 = nn.BatchNorm2d(32)
self.head = nn.Linear(448, 2)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.head(x.view(x.size(0), -1))
policy_net = DQN().to(device)
target_net = DQN().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
接下来,我们定义我们的网络。 网络采用当前状态,对其进行一些卷积运算,最后收敛到线性层,并给出当前状态值的输出,和表示在该状态下有多大好处的值。
我们定义了两个网络policy_net
和target_net
。 我们将policy_net
的权重复制到target_net
,以便它们代表相同的网络。 我们将target_net
设为评估模式,以便在反向传播时不更新网络的权重。 我们将在每个步骤中推断policy_net
,但会不时更新target_net
。
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
steps_done = 0
def select_action(state):
global steps_done
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1\. * steps_done / EPS_DECAY)
steps_done += 1
sample = random.random()
if sample > eps_threshold:
# freeze the network and get predictions
with torch.no_grad():
return policy_net(state).max(1)[1].view(1, 1)
else:
# select random action
return torch.tensor([[random.randrange(2)]], device=device, dtype=torch.long)
接下来,我们定义一种使用 ε 贪婪策略为我们采取行动的方法。 我们可以从策略网中推断出一定时间百分比,但是也有eps_threshold
的机会,这意味着我们将随机选择操作。
num_episodes = 20
TARGET_UPDATE = 5
for i_episode in range(num_episodes):
env.reset()
last_screen = get_screen()
current_screen = get_screen()
state = current_screen - last_screen
for t in count(): # for each timestep in an episode
# Select action for the given state and get rewards
action = select_action(state)
_, reward, done, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
# Observe new state
last_screen = current_screen
current_screen = get_screen()
if not done:
next_state = current_screen - last_screen
else:
next_state = None
# Store the transition in memory
memory.push(state, action, next_state, reward)
# Move to the next state
state = next_state
# Perform one step of the optimization (on the target network)
optimize_model()
if done:
break
# Update the target network every TARGET_UPDATE episodes
if i_episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
env.close()
让我们看看我们的训练循环。 对于每个剧集,我们都会重置环境。 我们从环境中获得了两个屏幕,将当前状态定义为两个屏幕之间的差异。 然后,对于剧集中的每个时间步,我们使用select_action
函数选择一个动作。 我们要求环境采取该行动,并将奖励和done
标志归还(它告诉我们剧集是否结束,也就是卡塔普尔跌倒了)。 我们观察到已经提出的新状态。 然后,我们将刚刚经历的事务推入存储体,并移至下一个状态。 下一步是优化模型。 我们将很快介绍该函数。
我们还将每五集使用policy_net
权重的副本更新target_net
。
BATCH_SIZE = 64
GAMMA = 0.999
optimizer = optim.RMSprop(policy_net.parameters())
def optimize_model():
# Dont optimize till atleast BATCH_SIZE memories are filled
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
# Get the actual Q
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
state_values = policy_net(state_batch) # Values of States for all actions
# Values of states for the selected action
state_action_values = state_values.gather(1, action_batch)
# Get the expected Q
# # Mask to identify if next state is final
non_final_mask = torch.tensor(tuple(map
(lambda s: s is not None,
batch.next_state)),
device=device,
dtype=torch.uint8)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
next_state_values = torch.zeros(BATCH_SIZE, device=device) # init to zeros
# predict next non final state values from target_net using next states
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
reward_batch = torch.cat(batch.reward)
# calculate the predicted values of states for actions
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
然后是主要部分:优化器步骤。 这是我们使用RMSProp
找出损失和反向传播的地方。 我们从存储库中提取了一些经验。 然后,我们将所有状态,动作和奖励转换为批量。 我们通过policy_net
传递状态并获得相应的值。
然后,我们收集与操作批量相对应的值。
现在我们有了状态动作对,以及与之相关的值。 这对应于实际的 Q 函数。
接下来,我们需要找到期望的 Q 函数。 我们创建一个由 0 和 1 组成的掩码,将非 0 状态映射为 1,将 0 状态(终端状态)映射为 0。通过算法的设计,我们知道终端状态将始终具有值 0。 状态的值为正,但终端状态的值为 0。掩码如下所示:
在那批状态中,置于 0 的 1 是终端状态。 所有其他均为非最终状态。 我们将所有非最终的下一个状态连接到non_final_next_states
中。 之后,我们将next_state_values
初始化为全 0。 然后,我们将non_final_next_states
传递给target_network
,从中获得最大值的操作值,并将其应用于next_state_values[non_final_mask]
。 我们将从非最终状态预测的所有值都放入非最终next_state_values
数组。 next_state_values
的外观如下:
最后,我们计算期望的 Q 函数。 根据我们先前的讨论,它将是R + Gamma
(下一个状态值)。 然后,我们根据实际 Q 函数和预期 Q 函数计算损失,然后将误差反向传播到策略网络(请记住target_net
处于eval
模式)。 我们还使用梯度钳制来确保梯度较小且不会转移得太远。
训练神经网络将花费一些时间,因为该过程将渲染每个帧并计算该误差。 我们本可以使用一种更简单的方法,直接获取速度和位置来表示损失函数,并且由于不需要渲染每一帧,因此可以花费更少的时间进行训练。 它只会直接从env.state
接受输入。
此算法有许多改进,例如为智能体增加了想象力,以便可以更好地探索和想象其脑海中的动作,并做出更好的预测。
在本章中,我们学习了无监督学习的一个全新领域:强化学习。 这是一个完全不同的领域,我们在本章中仅涉及了这个主题。 我们学习了如何对问题进行措辞以进行强化学习,然后我们训练了一个模型,该模型可以看到环境提供的一些测量结果,并且可以学习如何平衡赤字。 您可以应用相同的知识来教机器人走路,驾驶汽车以及玩游戏。 这是深度学习的更多物理应用之一。
在下一章和最后一章中,我们将着眼于生产我们的 PyTorch 模型,以便您可以在任何框架或语言上运行它们,并扩展您的深度学习应用。
Google DeepMind 挑战赛:Lee Sedol 与 AlphaGo
本章由 Sudhanshu Passi 贡献。
八、生产中的 PyTorch
2017 年,当 PyTorch 发布其可用版本时,它的承诺是成为研究人员的 Python 优先框架。 PyTorch 社区对此严格了一年,但随后看到了大量的生产要求,并决定将生产能力与 PyTorch 的第一个稳定版本 1.0 合并,但又不影响其创建的可用性和灵活性。
PyTorch 以其干净的框架而闻名,因此要获得研究所需的生产能力和灵活性是一项艰巨的任务。 我认为,将生产支持推向核心的主要障碍是摆脱 Python 的境界,并将 PyTorch 模型转移到具有多线程功能的更快的线程安全语言中。 但是随后,这违反了 PyTorch 当时所遵循的 Python 优先原则。
解决此问题的第一步是使开放式神经网络交换(ONNX)格式稳定,并与所有流行的框架兼容(至少与具有良好功能的框架兼容) 模块)。 ONNX 定义了深度学习图所需的基本运算符和标准数据类型。 这引导了 ONNX 进入 PyTorch 核心的道路,并且它与 ONNX 转换器一起为流行的深度学习框架(例如 CNTK,MXNet,TensorFlow 等)构建。
ONNX 很棒,并且每个人都喜欢它,但是 ONNX 的主要缺点之一是其脚本模式。 也就是说,ONNX 运行一次图以获取有关图的信息,然后将其转换为 ONNX 格式。 因此,ONNX 无法迁移模型中的控制流(将for
循环用于循环神经网络(RNN)模型的不同序列长度)。
生产 PyTorch 的第二种方法是在 PyTorch 本身中构建高性能后端。 Caffe2 的核心与 PyTorch 核心合并在一起,而不是从头开始构建一个,但 Python API 保持不变。 但是,这并不能解决 Python 语言所具有的问题。
接下来是 TorchScript 的引入,它可以将本机 Python 模型转换为可以在高性能 Universe 中加载的序列化形式,例如 C++ 线程。 PyTorch 的后端 LibTorch 可以读取 TorchScript,这使 PyTorch 高效。 有了它,开发人员可以对模型进行原型设计,甚至可以使用 Python 本身对其进行训练。 训练后,可以将模型转换为到中间表示(IR)。 目前,仅开发了 C++ 后端,因此可以将 IR 作为 C++ 对象加载,然后可以从 PyTorch 的 C++ API 中读取。 TorchScript 甚至可以在 Python 程序中转换控制流,这在生产支持的情况下使其优于 ONNX 方法。 TorchScript 本身是 Python 语言中可能的操作的子集,因此不允许任何 Python 操作用 TorchScript 编写。 官方文档本身提供了非常详细的说明,并讨论了可能的情况和不可能的情况,以及许多示例[1]。
在本章中,我们将从使用 Flask(流行的 Python Web 框架)提供普通的 Python PyTorch 模型开始。 这样的设置通常就足够了,特别是如果您要设置示例 Web 应用或满足您个人需求或类似用例的东西。 然后,我们将探索 ONNX 并将 PyTorch 模型转换为 MXNet,然后可以使用 MXNet 模型服务器提供服务。 从那里,我们将转到 TorchScript,这是 PyTorch 社区的新东西。 使用 TorchScript,我们将制作 C++ 可执行文件,然后可以在 LibTorch 的帮助下从 C++ 执行该可执行文件。 然后,可以从稳定,高性能的 C++ 服务器甚至使用 cgo 的 Go 服务器提供高效的 C++ 可执行文件。 对于所有份量,我们将使用在第 2 章,“简单神经网络”中构建的 fizzbuzz 网络。
与 Flask 一起使用
在 Python 本身中提供 PyTorch 模型是在生产环境中提供模型的最简单方法。 但是在解释如何完成之前,让我们快速看一下 Flask 是什么。 完全解释 Flask 不在本章的讨论范围内,但我们仍将介绍 Flask 的最基本概念。
Flask 简介
Flask 是的微框架,已被 Python 领域的多家大公司用于生产。 即使 Flask 提供了可用于将 UI 推送到客户端的模板引擎,我们也没有使用它。 相反,我们将制作一个提供 API 的 RESTful 后端。
可以使用pip
来安装 Flask ,就像其他任何 Python 包一样:
pip install Flask
这将安装其他依赖项 Werkzeug(应用和服务器之间的 Python 接口),Jinga(作为模板引擎),其危险(用于安全签名数据)和 Click(作为 CLI 构建器)。
安装后,用户将可以访问 CLI,并使用flask run
调用我们的脚本将启动服务器:
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return "Hello World!"
该示例包含四个部分:
第一行是我们导入 Flask 包的位置。
我们创建一个 Flask 对象,这是我们的大型 Web 应用对象,Flask 服务器将使用该对象来运行我们的服务器。
有了应用对象后,我们需要存储有关对象应对其执行操作的 URL 的信息。 为此,应用对象带有route
方法,该方法接受所需的 URL 并返回装饰器。 这是我们希望应用现在提供的 URL。
由应用对象返回的装饰器对一个函数进行装饰,当 URL 命中时,将触发该函数。 我们将其命名为hello
。 函数的名称在这里并不重要。 在前面的示例中,它只是检查输入并做出相应的响应。 但是对于我们的模型服务器,我们使此函数稍微复杂一点,以便它可以接受输入并将该输入提供给我们构建的模型。 然后,我们模型的返回值将作为 HTTP 响应推回给用户。
我们通过建立flask_trial
目录开始实现,并将该文件另存为app.py
在该目录中:
mkdir flask_trial
cd flask_trial
然后,我们执行 Flask 随附的 CLI 命令来启动服务器。 执行后,如果未提供自定义参数,您将看到http://127.0.0.1:5000
正在为服务器提供服务。
flask run
我们可以通过向服务器位置发出 HTTP 请求来测试简单的 Flask 应用。 如果一切正常,我们应该得到一个“你好,世界!” 来自服务器的消息。
-> curl "http://127.0.0.1:5000"
-> Hello World!
我们已经建立了简单的 Flask 应用。 现在,将 fizzbuzz 模型引入我们的应用。 以下代码片段显示了与第 2 章和“简单神经网络”相同的模型,供您参考。 该模型将从路由函数中调用。 我们已经在第 2 章和“一个简单的神经网络”中对模型进行了训练,因此,我们将在这里加载训练后的模型,而不是再次对其进行训练:
import torch.nn as nn
import torch
class FizBuzNet(nn.Module):
2 layer network for predicting fiz or buz
param: input_size -> int
param: output_size -> int
def __init__(self, input_size, hidden_size, output_size):
super(FizBuzNet, self).__init__()
self.hidden = nn.Linear(input_size, hidden_size)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, batch):
hidden = self.hidden(batch)
activated = torch.sigmoid(hidden)
out = self.out(activated)
return out
用于 Flask 的模型
下面的屏幕快照给出了我们应用的目录结构。 assets
文件夹具有训练好的模型,在加载模型时,controller.py
文件将使用该模型。 根目录中的app.py
是 Flask 应用的入口。 Flask 首选app.py
作为入口点文件的默认名称。
当您执行flask run
时,Flask 将在当前目录中查找app.py
文件并执行该文件。 controller.py
文件是我们从model.py
文件加载模型的地方。 然后,加载的模型将等待用户通过 HTTP 端点输入。 app.py
将用户输入重定向到controller
,然后将其转换为 Torch 张量。
张量对象将通过神经网络传递,并且controller
将神经网络的结果传递给后处理操作后,从神经网络返回结果。
图 8.1:当前目录
目录中有四个组件用于制作 Flask 应用。 assets
文件夹是我们保留模型的地方。 其他三个文件是代码所在的位置。 让我们研究一下每个。 我们将从入口文件app.py
开始。 它是先前提供的简单 Flask 应用的扩展版本。 该文件教我们如何定义 URL 端点,以及如何将 URL 端点映射到 Python 函数。 我们的扩展app.py
文件显示在以下代码块中:
import json
from flask import Flask
from flask import request
import controller
app = Flask('FizBuzAPI')
@app.route('/predictions/fizbuz_package', methods=['POST'])
def predict():
which = request.get_json().get('input.1')
if not which:
return "InvalidData"
number = int(which) + 1
prediction = controller.run(number)
out = json.dumps({'NextNumber': prediction})
except ValueError:
out = json.dumps({'NextNumber': 'WooHooo!!!'})
return out
Flask 为我们提供了request
工具,它是一个全局变量,但对于存储有关当前请求信息的当前线程而言是局部的。 我们使用request
对象的get_json
函数从request
对象获取主体POST
参数。 然后,将通过 HTTP 传入的字符串数据转换为整数。 这个整数是我们从前端传递的数字。 我们应用的任务是预测下一个数字的状态。 那将是下一个数字本身还是嘶嘶声,嗡嗡声或嘶嘶声? 但是,如果您还记得,我们会训练我们的网络来预测我们通过的号码的状态。 但是,我们需要下一个号码的状态。 因此,我们将一个加到当前数上,然后将结果传递给我们的模型。
我们的下一个导入是controller
,我们在其中加载了模型文件。 我们正在调用run
方法并将数字传递给模型。 然后,将controller
的预测值作为字典传递回。 Flask 会将其转换为响应正文并将其发送回用户。
在继续之前,我们可以从以前的简单 Flask 应用的扩展版本中看到两个主要差异。 一种是 URL 路由:/predictions/fizbuz_package
。 如前所述,Flask 允许您将任何 URL 端点映射到您选择的函数。
其次,我们在装饰器中使用了另一个关键字参数:methods
。 这样,我们告诉 Flask,不仅需要通过 URL 规则来调用此函数,而且还需要在对该 URL 的POST
方法调用上进行调用。 因此,我们像以前一样使用flask run
运行该应用,并使用curl
命令对其进行测试。
-> curl -X POST http://127.0.0.1:5000/predictions/fizbuz_package \
-H "Content-Type: application/json" \
-d '{"input.1": 14}'
-> {"NextNumber": "FizBuz"}
在 HTTP POST
请求中,我们传递了输入数字为14
的 JSON 对象,我们的服务器返回了下一个数字FizBuz
。 所有这些魔术都发生在我们的app.py
调用的controller.run()
方法中。 现在,让我们看看该函数在做什么。
接下来是使用run()
方法的controller
文件。 在这里,我们将输入数字转换为 10 位二进制数(请记住,在第 2 章,“简单神经网络”中,这是我们作为输入传递给 fizzbuzz 网络的东西),将其变为 Torch 张量。 然后将二进制张量传递给我们模型的正向函数,以得到具有预测的1 x 4
张量。
通过从加载了保存的.pth
文件的模型文件中调用FizBuz
类来创建我们的模型。 我们使用 Torch 的load_state_dict
方法将参数加载到初始化的模型中。 之后,我们将模型转换为eval()
模式,这将模型设置为评估模式(它在评估模式下关闭了batchnorm
丢弃层)。 模型的输出是运行max
并确定哪个索引具有最大值,然后将其转换为可读输出的概率分布。
为生产准备的服务器
这是关于如何使用 Flask 将 PyTorch 模型部署到服务器的非常基本的演练。 但是 Flask 的内置服务器尚未投入生产,只能用于开发目的。 开发完成后,我们应该使用其他服务器包在生产中为 Flask 应用提供服务。
Gunicorn 是 Python 开发人员使用的最受欢迎的服务器包之一,将其与 Flask 应用绑定非常容易。 您可以使用pip
安装 Gunicorn,就像我们安装 Flask 一样:
pip install gunicorn
Gunicorn 需要我们传递模块名称,以便它能够拾取模块并运行服务器。 但是 Gunicorn 希望应用对象具有名称application
,而我们的项目则不是这样。 因此,我们需要显式传递应用对象名称和模块名称。 Gunicorn 的命令行工具有很多选择,但是我们正在尝试使其尽可能简单:
gunicorn app:app
import torch
from model import FizBuzNet
input_size = 10
output_size = 4
hidden_size = 100
def binary_encoder():
def wrapper(num):
ret = [int(i) for i in '{0:b}'.format(num)]
return [0] * (input_size - len(ret)) + ret
return wrapper
net = FizBuzNet(input_size, hidden_size, output_size)
net.load_state_dict(torch.load('assets/fizbuz_model.pth'))
net.eval()
encoder = binary_encoder()
def run(number):
with torch.no_grad():
binary = torch.Tensor([encoder(number)])
out = net(binary)[0].max(0)[1].item()
return get_readable_output(number, out)
建立 ONNX 协议是为了创建不同框架之间的互操作性。 这可以帮助 AI 开发人员和组织选择合适的框架来开发他们花费大部分时间的 AI 模型。 一旦开发和训练阶段结束,他们便可以将模型迁移到他们选择的任何框架中,以在生产中提供服务。
可以针对不同目的优化不同的框架,例如移动部署,可读性和灵活性,生产部署等。 有时将模型转换为不同的框架是不可避免的,手动转换很耗时。 这是 ONNX 试图通过互操作性解决的另一个用例。
让我们以任何框架示例为例,看看 ONNX 适合什么地方。框架将具有语言 API(供开发人员使用),然后是由他们开发的模型的图形表示。 然后,该 IR 进入高度优化的运行时以执行。 ONNX 为此 IR 提供了统一的标准,并使所有框架都了解 ONNX 的 IR。 借助 ONNX,开发人员可以使用 API制作模型,然后将其转换为框架的 IR。 ONNX 转换器可以将该 IR 转换为 ONNX 的标准 IR,然后可以将其转换为其他框架的 IR。
这是 PyTorch 的 Fizzbuzz 网络的 IR 的可读表示:
graph(%input.1 : Float(1, 10)
%weight.1 : Float(100, 10)
%bias.1 : Float(100)
%weight : Float(4, 100)
%bias : Float(4)) {
%5 : Float(10!, 100!) = aten::t(%weight.1),scope: FizBuzNet/Linear[hidden]
%6 : int = prim::Constant[value=1](),scope: FizBuzNet/Linear[hidden]
%7 : int = prim::Constant[value=1](),scope: FizBuzNet/Linear[hidden]
%hidden : Float(1, 100) = aten::addmm(%bias.1, %input.1, %5, %6,%7), scope: FizBuzNet/Linear [hidden]
%input : Float(1, 100) = aten::sigmoid(%hidden),scope: FizBuzNet
%10 : Float(100!, 4!) = aten::t(%weight),scope: FizBuzNet/Linear[out]
%11 : int = prim::Constant[value=1](),scope: FizBuzNet/Linear[out]
%12 : int = prim::Constant[value=1](),scope: FizBuzNet/Linear[out]
%13 : Float(1, 4) = aten::addmm(%bias, %input, %10, %11, %12),scope: FizBuzNet/Linear[out]
return (%13);
表示清楚地表明了整个网络的结构。 前五行显示参数和输入张量,并为每一个标记一个名称。 例如,整个网络将输入张量定为input.i
,它是形状为1 x 10
的浮点张量。然后,它显示了我们第一层和第二层的权重和偏差张量。
从第六行开始,显示了图的结构。 每行的第一部分(以%
符号开头的全冒号之前的字符)是每行的标识符,这是其他行中用来引用这些行的标识符。 例如,以%5
作为标识符的线对aten::t(%weight.i)
表示的第一层的权重进行转置,从而输出形状为10 x 100
的浮点张量。
图 8.2:另一个 IR 转换为 ONNX 的 IR,然后又转换为另一个 IR
PyTorch 具有内置的 ONNX 导出器,它可以帮助我们创建 ONNX IR,而无需离开 PyTorch。 在此处给出的示例中,我们将 fizbuzz 网络导出到 ONNX,然后由 MXNet 模型服务器提供服务。 在以下代码段中,我们使用 PyTorch 的内置export
模块将 fizzbuzz 网络转换为 ONNX 的 IR:
>>> import torch
>>> dummy_input = torch.Tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 0]])
>>> dummy_inputtensor([[O., 0., 0., 0., 0., 0., 0., O., 1., 0.]])
>>> net = FizBuzNet(input_size, hidden_size, output_size)
>>> net.load_state_dict(torch.load('assets/fizbuz_model.pth'))
>>> dummy_input = torch.Tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 0]])
>>> torch.onnx.export(net, dummy_input, "fizbuz.onnx", verbose=True)
在最后一行,我们调用export
模块,并传递 PyTorch 的net
,虚拟输入和输出文件名。 ONNX 通过跟踪图进行转换; 也就是说,它使用我们提供的虚拟输入执行一次图。
在执行图时,它会跟踪我们执行的 PyTorch 操作,然后将每个操作转换为 ONNX 格式。 键值参数verbose=True
在导出时将输出写入到终端屏幕。 它为我们提供了 ONNX 中相同图的 IR 表示:
graph(%input.1 : Float(1, 10)
%1 : Float(100, 10)
%2 : Float(100)
%3 : Float(4, 100)
%4 : Float(4)) {
%5 : Float(1, 100) = onnx::Gemm[alpha=1, beta=1,transB=1](%input.1, %1, %2),scope: FizBuzNet/Linear[hidden]
%6 : Float(1, 100) = onnx::Sigmoid(%5), scope: FizBuzNet
%7 : Float(1, 4) = onnx::Gemm[alpha=1, beta=1,transB=1](%6, %3, %4),scope: FizBuzNet/Linear[out]
return (%7);
它还显示了图执行所需的所有操作,但比 PyTorch 的图形表示要小。 虽然 PyTorch 向我们显示了每个操作(包括转置操作),但 ONNX 会在高级功能(例如onnx:Gemm
)下抽象该粒度信息,前提是其他框架的import
模块可以读取这些抽象。
PyTorch 的export
模块将 ONNX 模型保存在fizbuz.onnx
文件中。 可以从 ONNX 本身或其他框架中内置的 ONNX 导入程序中加载。 在这里,我们将 ONNX 模型加载到 ONNX 本身并进行模型检查。 ONNX 还具有由 Microsoft 管理的高性能运行时,这超出了本书的解释范围,但可在这个页面上获得。
由于 ONNX 已成为框架之间互操作性的规范,因此围绕它构建了其他工具。 最常用/最有用的工具可能是 Netron,它是 ONNX 模型的可视化工具。 尽管 Netron 不像 TensorBoard 那样具有交互性,但 Netron 足以用于基本可视化。
拥有.onnx
文件后,您可以将文件位置作为参数传递给 Netron 命令行工具,该工具将构建服务器并在浏览器中显示该图:
pip install netron
netron -b fizbuz.onnx
前面的命令将使用 Fizzbuzz 网络的图可视化来启动 Netron 服务器,如下图所示。 除了可缩放的图外,Netron 还可以可视化其他基本信息,例如版本,生成器,图的生成方式等等。 另外,每个节点都是可单击的,它将显示有关该特定节点的信息。 当然,这还不够复杂,无法满足可视化工具所需的所有要求,但足以让我们对整个网络有所了解。
图 8.3:Fizzbuzz 网络的 Netron 可视化
从成为 ONNX 可视化工具开始,Netron 逐渐接受所有流行框架的导出模型。 目前,根据官方文件,Netron 接受 ONNX,Keras,CoreML,Caffe2,MXNet,TensorFlow Lite,TensorFlow.js,TensorFlow,Caffe,PyTorch,Torch,CNTK,PaddlePaddle,Darknet 和 scikit-learn 的模型。
MXNet 模型服务器
现在我们离开了 PyTorch 世界。 我们现在有不同的模型服务器,但我们选择了 MXNet 模型服务器。 MXNet 模型服务器由社区维护,由亚马逊团队领导,也称为 MMS。 从这里开始,我将交替使用 MMS 和 MXNet 模型服务器。
MXNet 比其他服务模块更好。 在撰写本文时,TensorFlow 与 Python 3.7 不兼容,并且 MXNet 的服务模块已与内置的 ONNX 模型集成,这使开发人员可以轻松地以很少的命令行为模型提供服务,而无需了解分布式或高度可扩展的部署的复杂性。
其他模型服务器,例如 TensorRT 和 Clipper,不像 MXNet 服务器那样易于设置和管理。 而且,MXNet 附带了另一个名为 MXNet 存档器的工具,该工具将所有必需的文件打包成一个捆绑包,这些文件可以独立部署,而不必担心其他依赖项。 除了 MXNet 模型服务器具备的所有这些很酷的功能之外,最大的好处是能够自定义预处理和后处理步骤。 我们将在接下来的部分中介绍如何完成所有这些操作。
整个过程的流程从我们尝试使用模型存档器创建具有.mar
格式的单个存档文件的位置开始。 单个捆绑包文件需要 ONNX 模型文件signature.json
,该文件提供有关输入大小,名称等的信息。 认为它是可以随时更改的配置文件。 如果您决定将所有值硬编码到代码中,而不是从配置中读取,则它甚至不必成为存档的一部分。 然后,您需要服务文件,您可以在其中定义预处理,推理功能,后处理功能和其他工具函数。
制作完模型档案后,我们可以调用模型服务器,并将位置作为输入传递给我们的模型档案。 而已; 您现在可以从超级性能模型服务器提供模型。
MXNet 模型存档器
我们将通过安装 MXNet 模型存档器开始我们的旅程。 MXNet 模型服务器随附的默认模型存档器不支持 ONNX,因此我们需要单独安装。 ONNX 的模型存档器依赖于协议缓冲区和 MXNet 包本身。 官方文档中提供了为每个操作系统安装 protobuf 编译器的指南。 可以通过pip
来安装 MXNet 包,就像我们已经安装了其他包一样(对于 GPU,MXNet 还有另一个包,但是这里我们正在安装 MXNet 的基本版本):
pip install mxnet
pip install model-archiver[onnx]
现在,我们可以安装 MXNet 模型服务器。 它基于 Java 虚拟机(JVM)构建,因此从 JVM 调用了运行有我们模型实例的多个线程。 利用 JVM 支持的复杂性,可以将 MXNet 服务器扩展为处理数千个请求的多个进程。
MXNet 服务器带有管理 API,该 API 通过 HTTP 提供。 这有助于生产团队根据需要增加/减少资源。 除了处理工作器规模之外,管理 API 还具有其他选项。 但是我们不会在这里深入探讨。 由于模型服务器在 JVM 上运行,因此我们需要安装 Java8。此外,MXNet 模型服务器在 Windows 上仍处于试验模式,但在 Linux 风味和 Mac 上稳定。
pip install mxnet-model-server
现在,在安装了所有前提条件之后,我们可以开始使用 MXNet 模型服务器对可用于生产的 PyTorch 模型进行编码。 首先,我们创建一个新目录,以保存所有需要的文件以供模型存档器创建捆绑文件。 然后,我们移动在上一步中创建的.onnx
文件。
MMS 的一项强制性要求是其中包含服务类的服务文件。 MMS 执行服务文件中唯一可用类的initialize()
和handle()
函数。 在下一节中,我们将逐一进行介绍,但这是我们可以用来制作服务文件的框架。
图 8.4:fizbuz_package
的目录结构
class MXNetModelService(object):
def __init__(self):
def initialize(self, context):
def preprocess(self, batch):
def inference(self, model_input):
def postprocess(self, inference_output):
def handle(self, data, context):
然后,我们需要一个签名文件。 正如我们之前所看到的,签名文件只是配置文件。 我们可以通过将值硬编码到脚本本身来避免发生这种情况,但是 MMS 人士也建议这样做。 我们为 fizzbuzz 网络制作了最小的签名文件,如下所示:
"inputs": [
"data_name": "input.1",
"data_shape": [
"input_type": "application/json"
在签名文件中,我们描述了数据名称,输入形状和输入类型。 当通过 HTTP 读取数据流时,这就是我们的服务器假定的数据信息。 通常,我们可以通过在签名文件中进行配置来使我们的 API 接受任何类型的数据。 但是然后我们的脚本也应该能够处理这些类型。 让我们完成服务文件,然后将其与 MMS 捆绑在一起。
如您先前所见,MMS 调用服务文件中唯一可用的单个类的initialize()
方法。 如果服务文件中存在更多类,那就完全是另一回事了,但是让我们足够简单地理解它。 顾名思义,initialize()
文件初始化所需的属性和方法:
def initialize(self, context):
properties = context.system_properties
model_dir = properties.get("model_dir")
gpu_id = properties.get("gpu_id")
self._batch_size = properties.get('batch_size')
signature_file_path = os.path.join(
model_dir, "signature.json")
if not os.path.isfile(signature_file_path):
raise RuntimeError("Missing signature.json file.")
with open(signature_file_path) as f:
self.signature = json.load(f)
data_names = []
data_shapes = []
input_data = self.signature["inputs"][0]
data_name = input_data["data_name"]
data_shape = input_data["data_shape"]
data_shape[0] = self._batch_size
data_names.append(data_name)
data_shapes.append((data_name, tuple(data_shape)))
self.mxnet_ctx = mx.cpu() if gpu_id is None elsemx.gpu(gpu_id)
sym, arg_params, aux_params = mx.model.load_checkpoint(checkpoint_prefix, self.epoch)
self.mx_model = mx.mod.Module(
symbol=sym, context=self.mxnet_ctx,
data_names=data_names, label_names=None)
self.mx_model.bind(
for_training=False, data_shapes=data_shapes)
self.mx_model.set_params(
arg_params, aux_params,
allow_missing=True, allow_extra=True)
self.has_initialized = True
MMS 在调用initialize()
时传递上下文参数,该参数具有在解压缩存档文件时获取的信息。 当首先使用存档文件路径作为参数调用 MMS 时,在调用服务文件之前,MMS 解压缩存档文件并安装模型,并收集信息,其中存储模型,MMS 可以使用多少个内核,它是否具有 GPU 等。 所有这些信息都作为上下文参数传递给initialize()
。
initialize()
的第一部分是收集此信息以及来自签名 JSON 文件的信息。 函数的第二部分从第一部分中收集的信息中获取与输入有关的数据。 然后,该函数的第三部分是创建 MXNet 模型并将训练后的参数加载到模型中。 最后,我们将self.has_initialized
变量设置为True
,然后将其用于检查服务文件其他部分的初始化状态:
def handle(self, data, context):
if not self.has_initialized:
self.initialize()
preprocess_start = time.time()
data = self.preprocess(data)
inference_start = time.time()
data = self.inference(data)
postprocess_start = time.time()
data = self.postprocess(data)
end_time = time.time()
metrics = context.metrics
metrics.add_time(self.add_first())
metrics.add_time(self.add_second())
metrics.add_time(self.add_third())
return data
except Exception as e:
request_processor = context.request_processor
request_processor.report_status(
500, "Unknown inference error")
return [str(e)] * self._batch_size
MMS 被编程为在每个请求上调用相同类的handle()
方法,这是我们控制流程的地方。 initialize()
函数只会在启动线程时被调用一次; 每个用户请求都将调用handle()
函数。 由于handle()
函数是针对每个用户请求被调用的,以及上下文信息,因此它也将在参数中获取当前数据。 但是,为了使程序模块化,我们没有在handle()
中进行任何操作; 取而代之的是,我们正在调用其他仅指定做一件事的函数:该函数应该做什么。
我们将整个流分为四个部分:预处理,推理,后处理和矩阵记录。 在handle()
的第一行中,我们验证是否正在使用上下文和数据信息初始化线程。 完成后,我们将进入流程。 现在,我们将逐步完成流程。
我们首先使用data
作为参数调用self.preprocess()
函数,其中data
将是 HTTP 请求的POST
正文内容。 preprocess
函数以与我们在signature.json
文件中配置的名称相同的名称获取传递的数据。 一旦有了数据,这就是我们需要系统预测下一个数字的整数。 由于我们已经训练了模型来预测当前号码的嘶嘶声状态,因此我们将在数据中为号码添加一个嗡嗡声,然后在新号码的二进制文件上创建一个 MXNet 数组:
def preprocess(self, batch):
param_name = self.signature['inputs'][0]['data_name']
data = batch[0].get('body').get(param_name)
if data:
self.input = data + 1
tensor = mx.nd.array(
[self.binary_encoder(self.input, input_size=10)])
return tensor
self.error = 'InvalidData'
handle()
函数获取已处理的数据,并将其传递给inference()
函数,该函数将使用已处理的数据调用保存在initialize()
函数上的 MXNet 模型。 inference()
函数返回大小为1 x 4
的输出张量,然后将其返回到handle()
函数。
def inference(self, model_input):
if self.error is not None:
return None
self.mx_model.forward(DataBatch([model_input]))
model_output = self.mx_model.get_outputs()
return model_output
然后将张量传递给postprocess()
函数,以将其转换为人类可读的输出。 我们具有self.get_readable_output()
函数,可根据需要将模型的输出转换为嘶嘶声,嗡嗡声,嘶嘶声嗡嗡声或下一个数字。
然后,后处理的数据返回到handle()
函数,在其中进行矩阵创建。 之后,数据将返回到handle()
函数的被调用方,该函数是 MMS 的一部分。 MMS 将该数据转换为 HTTP 响应,并将其返回给用户。 MMS 还记录矩阵的输出,以便操作可以实时查看矩阵并基于此做出决策:
def postprocess(self, inference_output):
if self.error is not None:
return [self.error] * self._batch_size
prediction = self.get_readable_output(
self.input,
int(inference_output[0].argmax(1).asscalar()))
out = [{'next_number': prediction}]
return out
一旦将所有文件包含在前面给出的目录中,就可以创建.mar
存档文件:
model-archiver \
--model-name fizbuz_package \
--model-path fizbuz_package \
--handler fizbuz_service -f
这将在当前目录中创建一个fizbuz_package.mar
文件。 然后可以将其作为 CLI 参数传递给 MMS:
mxnet-model-server \
--start \
--model-store FizBuz_with_ONNX \
--models fizbuz_package.mar
现在,我们的模型服务器已启动并在端口 8080 上运行(如果您尚未更改端口)。 我们可以尝试执行与 Flask 应用相同的curl
命令(显然,我们必须更改端口号)并检查模型。 我们应该获得与 Flask 应用完全相同的结果,但是现在我们可以根据需要动态地动态扩展或缩减工作器的数量。 MMS 为此提供了管理 API。 管理 API 带有几个可配置的选项,但是这里我们只关注于增加或减少工作器的数量。
除了在端口 8080 上运行的服务器之外,还将在 8081 上运行管理 API 服务,我们可以对其进行调用和控制配置。 使用简单的GET
请求命中该端点将为您提供服务器的状态。 但是在探究这一点之前,我们将工作器数量设为 1(默认情况下为 4)。 API 端点是适当的 REST 端点; 我们在路径中指定模型名称,并传递参数max_worker=1
以使工作器数为 1。 我们也可以通过min_worker=<number>
来增加工作器数量。 官方文档[2]中详细介绍了管理 API 上可能的配置。
-> curl -v -X PUT "http://localhost:8081/models/fizbuz_package?max_worker=1"
"status": "Processing worker updates..."
一旦减少了工作器的数量,我们就可以命中端点来确定服务器的状态。 示例输出(在我们减少了工作器数量之后)如下:
-> curl "http://localhost:8081/models/fizbuz_package"
"modelName": "fizbuz_package",
"modelUrl": "fizbuz_package.mar",
"runtime": "python",
"minWorkers": 1,
"maxWorkers": 1,
"batchSize": 1,
"maxBatchDelay": 100,
"workers": [
"id": "9000",
"startTime": "2019-02-11T19:03:41.763Z",
"status": "READY",
"gpu": false,
"memoryUsage": 0
我们已经设置了模型服务器,现在我们知道如何根据比例配置服务器。 让我们使用 Locust 对服务器进行负载测试,并检查服务器的负载情况,以及根据我们的需求增加/减少资源有多容易。 将 AI 模型部署到生产环境并非易事。
随后是示例蝗虫脚本,应将其另存为locust.py
在当前目录中。 如果已安装 Locust(可以使用pip
进行安装),则调用locust
将打开 Locust 服务器并打开 UI,我们可以在其中输入要测试的比例尺。 我们可以逐步提高规模,并检查服务器在什么时候开始崩溃,然后点击管理 API 以增加工作量并确保我们的服务器可以容纳规模:
import random
from locust import HttpLocust, TaskSet, task
class UserBehavior(TaskSet):
def on_start(self):
self.url = "/predictions/fizbuz_package"
self.headers = {"Content-Type": "application/json"}
@task(1)
def success(self):
data = {'input.1': random.randint(0, 1000)}
self.client.post(self.url, headers=self.headers, json=data)
class WebsiteUser(HttpLocust):
task_set = UserBehavior
host = "http://localhost: 8081"
图 8.5:Locust UI,我们可以在其中配置用户数量以模拟生产负载
TorchScript 的效率
我们已经设置了简单的 Flask 应用服务器来为我们的模型提供服务,并且已经使用 MXNet 模型服务器实现了相同的模型,但是如果我们需要摆脱 Python 的世界,并使用 C++ 或 Go 创建高效的服务器 ,或使用其他有效的语言,PyTorch 提出了 TorchScript,它可以生成模型中最有效的形式,并且可以在 C++ 中读取。
现在的问题是:这不是我们对 ONNX 所做的吗? 也就是说,从 PyTorch 模型创建另一个 IR? 是的,过程相似,但区别在于 ONNX 使用跟踪创建了优化的 IR; 也就是说,它通过模型传递虚拟输入,并在执行模型时记录 PyTorch 操作,然后将这些操作转换为中间 IR。
这种方法有一个问题:如果模型是数据相关的,例如 RNN 中的循环,或者if
/else
条件是基于输入的,那么跟踪就不能真正做到这一点。 跟踪将仅发现在特定执行周期中发生的情况,而忽略其他情况。 例如,如果我们的虚拟输入是 10 个单词的句子,而我们的模型是基于循环的 RNN,则跟踪的图将对 RNN 单元的 10 次执行进行硬编码,如果句子的长度大于 10,或者较短的句子带有更少的单词,则它将中断。 考虑到这一点引入了 TorchScript。
TorchScript 支持此类 Python 控制流的一个子集,唯一要做的就是将现有程序转换为所有控制流都是 TorchScript 支持的控制流的阶段。 LibTorch 可以读取 TorchScript 创建的中间阶段。 在此会话中,我们将创建 TorchScript 输出并编写一个 C++ 模块以使用 LibTorch 加载它。
即使 TorchScript 是 PyTorch 早期版本的 JIT 包的一部分,它仍在 PyTorch 1.0 中引入了可用且稳定的 TorchScript 版本。 TorchScript 可以序列化和优化用 PyTorch 编写的模型。
与 ONNX 一样,TorchScripts 可以作为 IR 保存到磁盘中,但是与 ONNX 不同,该 IR 经过优化可在生产环境中运行。 保存的 TorchScript 模型可以在不依赖 Python 的环境中加载。 由于性能和多线程原因,Python 一直是生产部署的瓶颈,即使 Python 可以带给您的扩展能力足以满足现实世界中的大多数使用情况。
避免这种基本的瓶颈是所有可用于生产环境的框架的主要任务,这就是为什么静态计算图统治框架世界的原因。 PyTorch 通过引入具有高级 API 的基于 C++ 的运行库来解决此问题,如果开发人员希望使用 C++ 进行编程,则可以使用这些 API。
通过将 TorchScript 推到核心,PyTorch 可以投入生产了。 TorchScript 可以将用 Python 编写的模型转换为高度优化的 IR,然后可由 LibTorch 读取。 然后,可以将 LibTorch 加载的模型保存为 C++ 对象,并可以在 C++ 程序或其他高效编程语言(例如 Go)中运行。
PyTorch 允许您通过两种方法制作 TorchScript IR。 最简单的是通过跟踪,就像 ONNX 一样。 您可以通过虚拟输入将模型(甚至函数)传递给torch.jit.trace
。 PyTorch 通过模型/函数运行虚拟输入,并在运行输入时跟踪操作。
然后,可以将跟踪的函数(PyTorch 操作)转换为优化的 IR,也称为静态单分配 IR。 像 ONNX 图一样,该图中的指令也具有张量库(ATen,PyTorch 的后端)可以理解的原始运算符。
这确实很容易,但是要付出代价。 基于跟踪的推理具有 ONNX 的基本问题:它无法处理依赖于数据的模型结构更改,即if
/else
条件检查或循环(序列数据)。 为了处理这种情况,PyTorch 引入了脚本模式。
可以通过使用torch.jit.script
装饰器(用于常规函数)和torch.jit.script_method
(用于 PyTorch 模型上的方法)来启用脚本模式。 通过此装饰器,函数/方法中的内容将直接转换为 TorchScript。 在对模型类使用torch.jit.script_method
时要记住的另一件重要事情是关于父类。 通常,我们从torch.nn.Module
继承,但是为了制作 TorchScript,我们从torch.jit.ScriptModule
继承。 这有助于 PyTorch 避免使用无法转换为 TorchScript 的纯 Python 方法。 目前,TorchScript 不支持所有 Python 函数,但具有支持数据相关张量操作的所有必需函数。
我们将首先将模型导出到ScriptModule
IR,以此开始 fizzbuzz 模型的 C++ 实现,就像我们对 ONNX 导出所做的一样:
net = FizBuzNet(input_size, hidden_size, output_size)
traced = torch.jit.trace(net, dummy_input)
traced.save('fizbuz.pt')
可以通过torch.load()
方法将保存的模型加载回 Python,但是我们将使用 C++ 中引入的类似 API LibTorch 将模型加载到 C++。 在讨论逻辑之前,让我们将所需的标头导入当前作用域:
#include <torch/script.h>
#include <iostream>
#include <memory>
#include <string>
最重要的头是torch/script.h
,它带来了 LibTorch 所需的所有方法和函数。 我们决定将模型名称和示例输入作为命令行参数传递。 因此,主程序的第一部分是读取命令行参数并将其解析为程序的其余部分:
std::string arg = argv[2];
int x = std::stoi(arg);
float array[10];
int i;
int j = 9;
for (i = 0; i < 10; ++i) {
array[j] = (x >> i) & 1;
程序读取第二个命令行参数,这是用户给出的用于获取预测的编号。 从命令行读取时,该数字为string
类型。 我们将其转换为int
。 对于string
到int
转换后的循环,我们需要将其转换为二进制数组。 这是 LibTorch 执行开始的地方:
std::shared_ptr<torch::jit::script::Module> module = torch::jit::load(argv[1]);
auto options = torch::TensorOptions().dtype(torch::kFloat32);
torch::Tensor tensor_in = torch::from_blob(array, {1, 10},options);
std::vector<torch::jit::IValue> inputs;
inputs.push_back(tensor_in);
at::Tensor output = module->forward(inputs).toTensor();
在第一行中,我们从路径加载模型,该路径作为第一个命令行参数传递(我们将变量声明为ScriptModule
)。 在第三行,我们使用from_blob
方法将二进制数组转换为二维 LibTorch 张量。 在最后一行,我们使用我们制作的张量执行模型的forward
方法,并将输出返回给用户。 这可能是我们可以实现以展示 TorchScript 实际操作的最基本示例。 官方文档中有许多示例,它们显示了脚本模式(与跟踪模式不同)的功能,可以理解 Python 控制流并将模型推向 C++ 世界。
探索 RedisAI
我们已经看到可以通过 TorchScript 获得的优化,但是优化的二进制文件将如何处理? 是的,我们可以在 C++ 世界中加载它,并制作 Go 服务器,然后在其中加载它,但这仍然很痛苦。
Redis Labs 和 Orobix 为我们带来了另一个名为 RedisAI 的解决方案。 它是基于 LibTorch 构建的高度优化的运行时,可以接受已编译的 TorchScript 二进制文件,以通过 Redis 协议提供服务。 对于没有 Redis 经验的人, 这里有很好的文档,那里的介绍文档[3]应该是一个好的开始。
RedisAI 带有三个选项来配置三个后端:PyTorch,TensorFlow 和 ONNX 运行时。 它并不仅限于此:RedisAI 在后端使用 DLPack 来使张量能够通过不同的框架,而无需花费很多转换成本。
那有什么意思? 假设您有一个 TensorFlow 模型,该模型将人脸转换为 128 维嵌入(这是 FaceNet 所做的)。 现在,您可以使 PyTorch 模型使用此 128 维嵌入进行分类。 在正常情况下,将张量从 TensorFlow 传递到 PyTorch 需要深入了解事物在幕后的工作方式,但是使用 RedisAI,您可以使用几个命令来完成。
RedisAI 是作为 Redis 服务器(loadmodule
开关)的模块构建的。 通过 RedisAI 提供模型的好处不仅在于拥有多个运行时以及它们之间的互操作性。 实际上,这对于生产部署来说是最不重要的。 RedisAI 附带的最重要的功能是故障转移和分布式部署选项已经嵌入到 Redis 服务器中。
借助 Redis Sentinel 和 Redis Cluster,我们可以在多集群,高可用性设置中部署 RedisAI,而无需对 DevOps 或基础架构建设有足够的了解。 另外,由于 Redis 拥有所有流行语言的客户端,因此,通过 RedisAI 部署 TorchScript 模型后,您基本上可以使用 Redis 的任何语言客户端与服务器通信以运行模型,将输入传递给模型,从模型获取输出,以及更多。
使用 RedisAI 的下一个亮点是 Redis 整个大型生态系统的可用性,例如 RedisGears(可将任何 Python 函数作为管道的一部分运行),RedisTimeSeries,Redis Streams 等。
让我们开始将使用 TorchScript 编译的 fizzbuzz 网络模型加载到 RedisAI。 首先,我们需要安装 Redis 服务器和 RedisAI 来设置环境。 installation.sh
文件包含三个部分来执行此操作:
sudo apt update
sudo apt install -y build-essential tcl libjemalloc-dev
sudo apt install -y git cmake unzip
curl -O http://download.redis.io/redis-stable.tar.gz
tar xzvf redis-stable.tar.gz
cd redis-stable
sudo make install
rm redis-stable.tar.gz
git clone https://github.com/RedisAI/RedisAI.git
cd RedisAl
bash get_deps.sh cpu
mkdir build
cd build
cmake -DDEPS_PATH=../deps/install ..
第一部分是我们安装所需依赖项的位置。 第二部分是我们下载 Redis 服务器二进制文件并进行安装的地方。 第三部分是克隆 RedisAI 服务器并使用make
进行构建。 安装完成后,我们可以运行run_server.sh
文件以将 RedisAI 作为已加载的模块来构建 Redis 服务器。
cd redis-stable
redis-server redis.conf --loadmodule ../RedisAI/build/redisai.so
现在,我们的 Redis 服务器已全部就绪。 设置 RedisAI 服务器就这么简单。 现在,使用 Sentinel 或 Cluster 对其进行扩展也并不可怕。 官方文档具有足够的信息供您入门。
在这里,我们从最小的 Python 脚本开始,以使用 RedisAI 运行 fizzbuzz 示例。 我们正在使用 Python 包Redis
与 Redis 服务器通信。 RedisAI 已经建立了一个正式的客户端,但是在撰写本文时还不能使用它。
r = redis.Redis()
MODEL_PATH = 'fizbuz_model.pt'
with open(MODEL_PATH,'rb') as f:
model_pt = f.read()
r.execute_command('AI.MODELSET', 'model', 'TORCH', 'CPU',model_pt)
上面的脚本首先打开与本地主机的 Redis 连接。 它读取以前使用 TorchScript 保存的二进制模型,并使用命令AI.MODELSET
在 RedisAI 中设置 Torch 模型。 该命令需要我们为服务器中的模型传递所需的名称,无论是要使用 CPU 还是 GPU,我们都想使用该后端,然后是二进制模型文件本身。 模型设置命令返回一条正常消息,然后循环浏览并等待用户输入。 如前所述,用户输入通过编码器传递,以将其转换为二进制编码格式。
while True:
number = int(input('Enter number, press CTRL+c to exit: ')) + 1
inputs = encoder(number)
r.execute_command('AI. TENSORSET', 'a', 'FLOAT', *inputs.shape, 'BLOB',inputs.tobytes())
r.execute_command('AI.MODELRUN', 'model', 'INPUTS', 'a','OUTPUTS', 'out')
typ, shape, buf = r.execute_command('AI.TENSORGET', 'out','BLOB')
prediction = np.frombuffer(buf, dtype=np.float32).argmax()
print(get_readable_output(number, prediction))
然后,我们使用AI.TENSORSET
来设置张量并将其映射到关键点。 您可能已经看到了我们将输入 NumPy 数组传递给后端的方式。 NumPy 有一个方便的函数tobytes()
,它给出了如何将数据存储在内存中的字符串格式。 我们明确告诉命令我们需要将模型另存为BLOB
。 保存模型的另一个选项是VALUES
,当您要保存更大的数组时,它不是很有用。
我们还必须传递数据类型和输入张量的形状。 做张量集时,我们应该考虑的一件事是数据类型和形状。 由于我们将输入作为缓冲区传递,因此 RedisAI 尝试使用我们传递的形状和数据类型信息将缓冲区转换为 DLPack 张量。 如果这与我们传递的字节串的长度不匹配,RedisAI 将抛出错误。
设置张量后,我们将模型保存在名为model
的键中,并将张量保存在名为a
的键中。 现在,我们可以通过传递模型键名称和张量键名称来运行AI.MODELRUN
命令。
如果有多个输入要传递,我们将使用张量集不止一次,并将所有键作为INPUTS
传递给MODELRUN
命令。 MODELRUN
命令将输出保存到OUTPUTS
下提到的键,然后AI.TENSORGET
可以读取。
在这里,我们像保存了一样将张量读为BLOB
。 张量命令为我们提供类型,形状和自身的缓冲。 然后将缓冲区传递给 NumPy 的frombuffer()
函数,该函数为我们提供了结果的 NumPy 数组。
一旦我们从 RedisAI 中获得了数据,那么其他章节中的内容将相同。 RedisAI 似乎是当前市场上可用于 AI 开发人员的最有前途的生产部署系统。 它甚至还处于早期阶段,并于 4 月在 RedisConf 2019 上发布。 我们可以在不久的将来看到 RedisAI 带来的许多惊人功能,这使其成为大部分 AI 社区事实上的部署机制。
在本章中,我们从最简单但性能最低的方法开始,使用了三种不同的方法将 PyTorch 投入生产:使用 Flask。 然后,我们转移到 MXNet 模型服务器,这是一个预先构建的,优化的服务器实现,可以使用管理 API 进行管理。 MXNet 模型服务器对不需要太多复杂性但需要可以根据需要扩展的高效服务器实现的人很有用。
最后,我们尝试使用 TorchScript 创建模型的最有效版本,并将其导入 C++ 中。 对于那些准备承担构建和维护 C++,Go 或 Rust 等底层语言服务器的复杂性的人,可以采用这种方法并构建自定义服务器,直到我们有可以读取脚本模块的更好的运行时为止,就像 MXNet 在 ONNX 模型上一样。
2018 年是模型服务器的一年; 有许多来自不同组织的模型服务器,它们具有不同的观点。 但是未来是光明的,我们可以看到越来越多的模型服务器每天都在问世,这可能会使所有前面提到的方法过时。
https://pytorch.org/docs/stable/jit.html
https://github.com/awslabs/mxnet-model-server/blob/master/docs/management_api.md
https://redis.io/topics/introduction