在训练神经网络模型时,我们更偏爱大规模的数据集和复杂的网络结构。更大规模的数据集和更复杂的网络结构可以让我们的模型表征能力更强,但同时也对计算的时间和空间提出了挑战。
序言
如果我们只用单机单GPU来跑,则会出现一卡有难(运行时间长、显存不足等),十卡围观的状态。很多神经网络库都提供了分布式训练的API,但是如果不了解内在机理,我们仍然很难得到满意的效率和效果。
首先需要明确的是神经网络模型的训练过程中,哪些部分可以并行,哪些部分不能并行。
对于一个 Batch 中的第 i 个样例xi 来说,通过 forward 求得 lossi 再通过 backward 求得梯度 Gi 的过程是相互独立的,不同样例可以同时进行计算。但是,当所有样例的 backward 完成后,我们需要求再使用 更新训练参数,这个过程依赖所有样例的计算结果 Gi ,不能并行。
Parameter Server
模型的训练可以分成 5 步:
1. 在训练前将数据和初始化参数加载进 GPU0 中,如果无法一次加载进来也可以分片加载;
2. 参数服务器 GPU0 将一个 Batch 的数据切成 3 份,交给工作服务器 GPU1、GPU2、GPU3(Map);
3. 参数服务器 GPU0 将模型(参数)复制 3 份,交给工作服务器 GPU1、GPU2、GPU3(Map);
4. 工作服务器 GPU1、GPU2、GPU3 利用数据和模型求得 loss 和梯度;
5. 将梯度求平均,在参数服务器 GPU0 更新参数(Reducer),并回到第二步(因为 GPU 中已经有数据了,所以不需要再进行第一步);
随着几个工作服务器 GPU 的增多,我们所需要的训练时间会越来越短。在 Pytorch 中,我们只需要这样调用 API,就可以实现 PS 并行:
import torch
import torch.nn as nn
# Define dataset...
train_dataset = ...
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
# 注意这里模型所在的 GPU 应与 device_ids[0] 和 output_device 一致
model = ...
model_dp = nn.DataParallel(model.cuda(), device_ids=[0, 1, 2], output_device=0)
# 注意要通过 module 获取模型实例
optimizer = optim.SGD(model_dp.module.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
epoch, batch_idx * len(data), len(train_dataset), loss.item()))
左右滑动看全部代码
然而,GPU 之间带宽有限、数据传输很耗时。常常跑图 10 分钟,打怪 30 秒。Mapper 和 Reducer 都需要占用大量带宽。如果是 15M/s 的网络传输速度,GPU0 Mapper 完 1G 的数据和模型就要一分多钟。因此,我们需要一种机制分散 GPU0 的传输压力。
Ring Allreduce
Ring Allreduce 包含 5 个平等的工作 GPU(GPU1、GPU2、GPU3、GPU4、GPU5),第 i 个 GPU 可以从第 i-1 个 GPU 接收数据,可以向第 i+1 个 GPU 发送数据,构成一个环(所以叫 ring)。
训练前将数据切片放入 GPU1、GPU2、GPU3、GPU4、GPU5 中,将初始化参数复制到 GPU1、GPU2、GPU3、GPU4、GPU5 中。
GPU 计算模型的梯度 Gi 时,根据模型的层数 j 将 Gi 分成 j 份。我们求也就是在求每一个 。对于一个 5 层的网络,我们可以写成这样:
从 i=1 开始,GPUi 向 GPUi+1 发送第 i 层的梯度 Gii , GPUi+1 在收到梯度后求和(所以叫 all reduce,每一个 GPU 都是一个 reducer)。
从 i=2 开始,GPUi 向 GPUi+1 发送第 i 层的梯度 Gii , GPUi+1 在收到梯度后求和。
从 i=3 开始,GPUi 向 GPUi+1 发送第 i 层的梯度Gii ,GPUi+1 在收到梯度后求和。
从 i=4 开始,GPUi 向 GPUi+1 发送第 i 层的梯度Gii ,GPUi+1 在收到梯度后求和。
上述 reduce 过程完成了求和,我们还需要将求和的结果同步到所有 GPU 中。从 i=1 开始,GPUi 向 GPUi+1 发送第 i 层的梯度 Gii , 在收到梯度后使用收到的梯度覆盖当前梯度(更换了一种 reduce 操作)。
从 i=2 开始,GPUi 向 GPUi+1 发送第 i 层的梯度 Gii , GPUi+1 在收到梯度后使用收到的梯度覆盖当前梯度。
从 i=3 开始,GPUi 向 GPUi+1 发送第 i 层的梯度 Gii , GPUi+1 在收到梯度后使用收到的梯度覆盖当前梯度。
从 i=4 开始,GPUi 向 GPUi+1 发送第 i 层的梯度 Gii ,GPUi+1 在收到梯度后使用收到的梯度覆盖当前梯度。
最后只需要再除以 N(N 为 GPU 个数)得到的结果就是 了。利用 Ring Allreduce ,我们仍然可以像 Parameter Server 一样在 2 x(N-1)次传输后完成参数的更新过程。
但是,每次传输不再是针对中心节点,而是分散在各节点的两两之间,大大减小了对带宽的压力,实现了带宽并行。
在 Pytorch 中,我们只需要这样调用 Horovod API 就可以实现 Ring Allreduce:
import torch
import horovod.torch as hvd
# 初始化
hvd.init()
# 设置可用 GPU
torch.cuda.set_device(hvd.local_rank())
# Define dataset...
train_dataset = ...
# 数据切片分给几个 GPU
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...
model.cuda()
# 初始化参数复制给几个 GPU
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
optimizer = optim.SGD(model.parameters())
# 每个 GPU 完成前馈和反馈、all reduce 完成计算平均梯度、更新参数
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
epoch, batch_idx * len(data), len(train_sampler), loss.item()))
但在实际的应用过程中,我们可能还需要对 Embedding 或是 BatchNormalize 的并行进行定制化处理,否则会对模型的梯度更新有所影响,进而导致 train 和 val 的准确之间有较大的 gap。