Auto Byte

专注未来出行及智能汽车科技

微信扫一扫获取更多资讯

Science AI

关注人工智能与其他前沿技术、基础学科的交叉研究与融合发展

微信扫一扫获取更多资讯

张智作者

分布式训练从入门到放弃

在训练神经网络模型时,我们更偏爱大规模的数据集和复杂的网络结构。更大规模的数据集和更复杂的网络结构可以让我们的模型表征能力更强,但同时也对计算的时间和空间提出了挑战。

序言

如果我们只用单机单GPU来跑,则会出现一卡有难(运行时间长、显存不足等),十卡围观的状态。很多神经网络库都提供了分布式训练的API,但是如果不了解内在机理,我们仍然很难得到满意的效率和效果。

首先需要明确的是神经网络模型的训练过程中,哪些部分可以并行,哪些部分不能并行。

对于一个 Batch 中的第 个样例xi 来说,通过 forward 求得 lossi 再通过 backward 求得梯度 Gi 的过程是相互独立的,不同样例可以同时进行计算。但是,当所有样例的 backward 完成后,我们需要求再使用  更新训练参数,这个过程依赖所有样例的计算结果 Gi ,不能并行。


Parameter Server

我们可以模仿 MapReduce 的思路,将上述可以并行的部分作为 Mapper,不能并行的部分作为 Reducer。Parameter Server 包含一个参数服务器(其实不一定是服务器,可以是 GPU0),和几个工作服务器(其实不一定是服务器,可以是 GPU1、GPU2、GPU3)。下图中左侧为参数 GPU(GPU 0),用于存储参数和数据;右侧三个为工作 GPU(GPU1、GPU2、GPU3),用于前馈和反馈计算。

模型的训练可以分成 5 步:

1. 在训练前将数据和初始化参数加载进 GPU0 中,如果无法一次加载进来也可以分片加载;

2. 参数服务器 GPU0 将一个 Batch 的数据切成 3 份,交给工作服务器 GPU1、GPU2、GPU3(Map);

3. 参数服务器 GPU0 将模型(参数)复制 3 份,交给工作服务器 GPU1、GPU2、GPU3(Map);

4. 工作服务器 GPU1、GPU2、GPU3 利用数据和模型求得 loss 和梯度;

5. 将梯度求平均,在参数服务器 GPU0 更新参数(Reducer),并回到第二步(因为 GPU 中已经有数据了,所以不需要再进行第一步);

随着几个工作服务器 GPU 的增多,我们所需要的训练时间会越来越短。在 Pytorch 中,我们只需要这样调用 API,就可以实现 PS 并行:

import torchimport torch.nn as nn
# Define dataset...train_dataset = ...train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
# 注意这里模型所在的 GPU 应与 device_ids[0] 和 output_device 一致model = ...model_dp = nn.DataParallel(model.cuda(), device_ids=[0, 1, 2], output_device=0)
# 注意要通过 module 获取模型实例optimizer = optim.SGD(model_dp.module.parameters())
for epoch in range(100):   for batch_idx, (data, target) in enumerate(train_loader):       optimizer.zero_grad()       output = model(data)       loss = F.nll_loss(output, target)       loss.backward()       optimizer.step()       if batch_idx % args.log_interval == 0:           print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(               epoch, batch_idx * len(data), len(train_dataset), loss.item()))

左右滑动看全部代码


然而,GPU 之间带宽有限、数据传输很耗时。常常跑图 10 分钟,打怪 30 秒。Mapper 和 Reducer 都需要占用大量带宽。如果是 15M/s 的网络传输速度,GPU0 Mapper 完 1G 的数据和模型就要一分多钟。因此,我们需要一种机制分散 GPU0 的传输压力。

Ring Allreduce

如何分散网络传输的压力,一直是高性能计算研究的主要课题之一。超算的集群经验可以为我提供借鉴。Ring Allreduce 是高性能计算中比较常用的技术,它可以帮助我们在不使用中心节点的前提下完成 Map 和 Reduce。

Ring Allreduce 包含 5 个平等的工作 GPU(GPU1、GPU2、GPU3、GPU4、GPU5),第 i 个 GPU 可以从第 i-1 个 GPU 接收数据,可以向第 i+1 个 GPU 发送数据,构成一个环(所以叫 ring)。


训练前将数据切片放入 GPU1、GPU2、GPU3、GPU4、GPU5 中,将初始化参数复制到 GPU1、GPU2、GPU3、GPU4、GPU5 中。

GPU 计算模型的梯度 Gi 时,根据模型的层数 jGi 分成 j 份。我们求也就是在求每一个 。对于一个 5 层的网络,我们可以写成这样:

i=1 开始,GPUi  向 GPUi+1  发送第 i 层的梯度 Gii  , GPUi+1  在收到梯度后求和(所以叫 all reduce,每一个 GPU 都是一个 reducer)。

i=2 开始,GPUi  向 GPUi+1   发送第 i 层的梯度 GiiGPUi+1 在收到梯度后求和。

从  i=3 开始,GPUiGPUi+1   发送第 i 层的梯度Gii   ,GPUi+1  在收到梯度后求和。

从  i=4  开始,GPUiGPUi+1 发送第 i 层的梯度GiiGPUi+1 在收到梯度后求和。

上述 reduce 过程完成了求和,我们还需要将求和的结果同步到所有 GPU 中。从 i=1 开始,GPUGPUi+1 发送第 i 层的梯度 Gii ,  在收到梯度后使用收到的梯度覆盖当前梯度(更换了一种 reduce 操作)。

 i=2 开始,GPUiGPUi+1  发送第 i 层的梯度 GiiGPUi+1 在收到梯度后使用收到的梯度覆盖当前梯度。

 i=3  开始,GPUi  向 GPUi+1  发送第 i 层的梯度 Gii  , GPUi+1 在收到梯度后使用收到的梯度覆盖当前梯度。

i=4 开始,GPUi  向 GPUi+1  发送第 i 层的梯度 GiiGPUi+1  在收到梯度后使用收到的梯度覆盖当前梯度。

最后只需要再除以 N(N 为 GPU 个数)得到的结果就是  了。利用 Ring Allreduce ,我们仍然可以像 Parameter Server 一样在 2 x(N-1)次传输后完成参数的更新过程。

但是,每次传输不再是针对中心节点,而是分散在各节点的两两之间,大大减小了对带宽的压力,实现了带宽并行。

在 Pytorch 中,我们只需要这样调用 Horovod API 就可以实现 Ring Allreduce:

import torchimport horovod.torch as hvd
# 初始化hvd.init()
# 设置可用 GPUtorch.cuda.set_device(hvd.local_rank())
# Define dataset...train_dataset = ...
# 数据切片分给几个 GPUtrain_sampler = torch.utils.data.distributed.DistributedSampler(    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
model = ...model.cuda()
# 初始化参数复制给几个 GPUhvd.broadcast_parameters(model.state_dict(), root_rank=0)optimizer = optim.SGD(model.parameters())
# 每个 GPU 完成前馈和反馈、all reduce 完成计算平均梯度、更新参数optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
for epoch in range(100):for batch_idx, (data, target) in enumerate(train_loader):       optimizer.zero_grad()       output = model(data)       loss = F.nll_loss(output, target)       loss.backward()       optimizer.step()       if batch_idx % args.log_interval == 0:           print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(               epoch, batch_idx * len(data), len(train_sampler), loss.item()))

但在实际的应用过程中,我们可能还需要对 Embedding 或是 BatchNormalize 的并行进行定制化处理,否则会对模型的梯度更新有所影响,进而导致 train 和 val 的准确之间有较大的 gap。



入门神经网络分布式计算
5
暂无评论
暂无评论~