跳转至

第6章:分布式训练 — 复习资料

1. 并行计算基础

1.1 阿姆达尔定律

核心观点:仅提高并行处理效率是不够的,除非串行处理效率也得到提升。串行部分决定理论加速上限。

设串行部分占比 \(1-p\),可并行部分占比 \(p\),并行处理器数为 \(s\)

\[S_{\text{latency}}(s) = \frac{1}{(1-p) + \frac{p}{s}}\]
  • \(s \to \infty\) 时,\(S \to \frac{1}{1-p}\)加速比存在理论上限
  • 例如 \(p=0.9\),即使无限处理器,加速比上限也仅 10 倍
  • 适用范围:问题规模固定

1.2 古斯塔夫森定律

核心观点:并行度提升 → 问题规模可以扩大 → 可接近线性加速。

\[S_{\text{latency}}(s) = 1 - p + sp\]

对比

阿姆达尔 古斯塔夫森
问题规模 固定 随并行度扩大
结论 加速比有上限 揭示了线性加速比的可能性

2. 深度学习串行训练回顾

2.1 TensorFlow 串行训练流程

标准深度学习串行训练的代码模式(以 TensorFlow 为例):

x = tf.placeholder(tf.float32)
W = tf.Variable(tf.float32)
b = tf.Variable(tf.float32)
m = W * x
s = m + b
y = tf.reduce_sum(s)
grad_W, grad_b = tf.gradients(y, [W, b])
update = optimizer.apply_gradients({[W, grad_W], [b, grad_b]})

对应数据流图:x → W → b → m(乘法) → s(加法) → y(求和) → 梯度反向传播 → 参数更新

2.2 深度学习训练的并行潜力

从数据流图中可以观察到天然并行性:

  • 多个样本可并行:不同样本的前向/反向计算彼此独立
  • 多个算子可并行:无数据依赖的操作可以同时执行
  • 张量计算天然并行:卷积/矩阵乘内部有大量并行维度

这三类并行潜力对应了三种并行化方案。


3. 并行化基本方案

3.1 三类并行方案总览

方案 思想 粒度
算子内并行 并行单个张量计算子内的计算(利用 GPU 多处理单元并行) 张量内部
算子间并行 — 数据并行 多个样本并行执行 样本级
算子间并行 — 模型并行 多个算子并行执行 算子级
组合并行 多种并行方案组合叠加 混合

3.2 算子内并行

思想:利用卷积/矩阵乘内部的数据并行性,在 Batch 维度、空间维度(H/W)、时间维度上切分,通过 SIMD 架构实现并行。

以 im2col 卷积为例:输入特征图展开为矩阵列,卷积核展开为矩阵行,转化为矩阵乘法后利用 GPU 多处理单元并行计算。展开过程涉及 \(N\)(batch)、\(C\)(通道)、\(H \times W\)(空间)、\(K\)(卷积核数)、\(R \times S\)(卷积核尺寸)等多维度并行。


3.3 模型并行

思想:将计算图划分到不同设备,每个设备负责网络的一部分层,跨设备传输中间激活值,各设备独立更新本地参数。

做法:以 ResNet50 为例,将前半部分层放到 GPU0,后半部分层放到 GPU1(Demo: twoGPU_res.py)。

class ModelParallelResNet50(nn.Module):
    def __init__(self):
        self.seq1 = nn.Sequential(...)  # 放到 cuda:0
        self.seq2 = nn.Sequential(...)  # 放到 cuda:1

3.4 流水线并行

思想:将 mini-batch 进一步切分为多个 micro-batch(split),当前一个 split 进入 GPU1 时,下一个 split 立即进入 GPU0,形成流水线以消除 Bubble。

前馈运算示意图核心逻辑

  • GPU0 处理完 split0 的 FC0 后,立即将中间结果发给 GPU1 做 FC1
  • 同时 GPU0 开始处理 split1 的 FC0,二者重叠执行
  • 只有首尾几个 split 存在少量 Bubble(首部预热、尾部排空)

GPipe 算法

  • 拆分 micro-batch,流水线并行减少空闲
  • 一个 batch 内所有 micro-batch 全部前向完成后,才统一执行反向传播
  • Bubble 仍然较多:前向末尾和反向开头之间存在大片空闲

PipeDream 算法

  • 第一个 split 前向结束立即开始反向,不等整个 batch 完成
  • 人为制造可控 bubble 让前向和反向在时间轴上重叠
  • 比 GPipe 空闲更少,吞吐更高

GPipe vs PipeDream 对比

GPipe PipeDream
反向时机 整个 batch 前向完成后统一反向 每个 split 前向完成立即反向
Bubble 大小 较大 较小
实现复杂度 较低 较高

代码示例:Demo twoGPU_pp.py——仅首尾 iter 有空洞,GPU 可同时工作,split 不宜过多。


3.5 数据并行

思想:每个设备存放完整模型副本 → 数据均匀分片 → 各自独立前向/反向计算梯度 → AllReduce 聚合所有梯度(求平均) → 统一 SGD 更新参数。

核心步骤

  1. 每个 worker 持有完整模型副本
  2. 数据按 worker 数量均匀分片(DistributedSampler)
  3. 各自计算局部梯度
  4. AllReduce 对所有 worker 的梯度求和/平均
  5. 各 worker 用聚合后的梯度统一更新参数

关键通信操作是 AllReduce——所有节点的梯度需要规约到每个节点。数据并行示意图中,GPU0 和 GPU1 各自的梯度 \(\nabla W\)\(\nabla b\) 等全部通过 AllReduce 聚合。


4. 集合通信原语

4.1 六种通信原语详解

类型 原语 功能 典型用途
一对多 Scatter 将主节点的数据分片并分发至其他指定节点 数据分发
一对多 Broadcast 某个节点把自身数据发送到集群中所有其他节点 网络参数初始化
多对一 Reduce 规约运算:SUM/MIN/MAX/PROD/LOR 等,汇总到一节点 梯度汇总
多对一 Gather 将多个节点上的数据收集到单个节点(反向 Scatter) 并行排序/搜索
多对多 All-Gather 收集所有数据到所有节点,不加工 = Gather + Broadcast 全节点数据对齐
多对多 All-Reduce 在所有节点上都应用同样的 Reduce 操作 梯度同步(数据并行核心)

All-Reduce 的两种实现方式

  1. Reduce + Broadcast
  2. Reduce-Scatter + All-Gather(即 Ring AllReduce 方案)

4.2 PyTorch 通信原语接口

dist.broadcast(tensor, src)           # 一对多广播
dist.reduce(tensor, dst, op)          # 多对一规约
dist.all_reduce(tensor, op)           # 多对多规约
dist.scatter(tensor, scatter_list, src)   # 一对多分片
dist.gather(tensor, gather_list, dst)     # 多对一收集
dist.all_gather(tensor_list, tensor)      # 多对多全收集
dist.barrier()                            # 同步障

4.3 点对点通信

点对点通信是所有集合通信原语实现的基础:

方式 API 特点 通信与计算
同步/阻塞 send / recv 调用不返回,必须等待传输完成 不可重叠
异步/非阻塞 isend / irecv + wait 调用立即返回,用 wait 等待完成 可重叠

Ring AllReduce 中综合使用阻塞和非阻塞命令:优先接收数据,发送在总线空闲时进行,避免冲突。


5. AllReduce 实现算法

5.1 参数服务器架构

结构:CPU/GPU 做中心参数服务器存储全局参数,各 Worker 从服务器拉取参数 → 本地计算梯度 → 上传梯度到服务器 → 服务器更新参数。

致命缺点:中心节点通信带宽成为瓶颈——主节点通信量为 \(O(N \cdot K)\),随 worker 数量线性增长。


5.2 Reduce + Broadcast

流程:先由中心节点 Reduce 汇总所有节点的梯度 → 再 Broadcast 分发给所有节点。

耗时公式\(2(\alpha + S/B) + NSC\)

缺点:主节点带宽瓶颈——所有数据都要经过中心节点,中心节点收发总量与其他节点不对称。


5.3 树形递归算法

流程:分层规约——第一层两两 Reduce → 第二层继续两两 Reduce → 逐级汇聚到根节点 → 再从根节点逐级向下广播。

优点:规避单节点瓶颈,耗时 \(O(\log N)\)

缺点:部分节点(非叶子节点)带宽闲置;根节点仍承受最大负载。


5.4 Ring AllReduce

动机:针对 master 带宽瓶颈问题,采用环形拓扑通信链路。

环形拓扑:N 个 GPU 围成环形,每个节点只与左右两个邻居通信,无中心节点。

阶段一 — Scatter-Reduce(N-1 轮迭代)

核心机制:

  • 数据分为 N 个等大 chunk
  • 第 k 个 worker 把第 k 份数据发给下一个 worker,同时从前一个 worker 收到第 k-1 份数据
  • 第 k 个 worker 把收到的第 k-1 份数据和自己的第 k-1 份数据累加整合,再将整合数据发给下一个 worker
  • N-1 次迭代后完成 Scatter-Reduce:每个节点持有某一个 chunk 的完整全局和(chunk j 的全局和在 GPU j)

阶段二 — All-Gather(N-1 轮迭代)

  • 每个节点将自己持有的完整 chunk(已规约好的)发给邻居
  • 邻居收到后替换本地对应 chunk 并继续转发
  • 再经过 N-1 次迭代,完成 All-Gather 操作
  • 最终所有节点获得全部 chunk 的全局和(即完整 AllReduce 结果)

实现细节

  • 每个 worker 的通信总线上同时连接两个邻接节点
  • 综合使用阻塞式及非阻塞点到点通信命令
  • 优先接受数据,发送则在总线空闲时进行,避免冲突
  • 代码:ring-allreduce.py

性能分析

整个过程中每个 worker 上 send 或 receive 的总通信数据量均为:

\[\text{Data Transferred} = 2(N-1) \frac{K}{N}\]

两条关键结论:

  1. 每个 worker 通信数据量近似独立于网络中 worker 的数量,为 \(O(K)\);而主从架构中主节点的通信数据量为 \(O(N \cdot K)\)
  2. 每个 worker 的网络收发负载是均衡的,网络双向带宽得到充分利用

6. 分布式 SGD

6.1 同步 SGD

流程:各节点算梯度 → AllReduce 平均 → 统一更新 → Barrier 等待所有节点完成

时间轴特征(3 台机器为例):

Machine 1: [Compute][Communicate] -----Waste----- [Compute]...
Machine 2: [Compute][Communicate] -----Waste----- [Compute]...
Machine 3: [Compute][Communicate]  Barrier  Waste  [Compute]...

每个 iter 之间都存在 Barrier 造成的 Waste(空闲等待),慢节点拖累全局。

特点:数学上等价于单机大 batch SGD(梯度一致),收敛有保证;但 Barrier 造成浪费。


6.2 异步 SGD

流程:各节点独立算梯度、独立更新参数,无 Barrier 等待

特点:速度快、无 Barrier 浪费;但梯度可能"过时"(stale gradient——节点在用旧参数算梯度时,其他节点已更新了参数),收敛性理论保证较弱。


6.3 数据并行 vs 模型并行 vs 非并行

维度 非并行 数据并行 模型并行
样本数据量 1 1/N 1
传输数据量 0 模型大小(梯度) 激活大小
存储占用 1 N(每节点存完整模型) 1(每节点存部分)
负载平衡度
并行限制 单步样本量(batch size) 算子数量(层间依赖)

7. Horovod

7.1 简介

Uber 开源的分布式训练框架,支持 TensorFlow / PyTorch / Keras / MXNet。

核心特点

  • 不依赖 torch.distributed,底层通信透明
  • 同步数据并行,用 hvd.DistributedOptimizer 包装优化器即可自动梯度 AllReduce
  • 支持梯度压缩(FP16)减少通信量
  • 多后端兼容(MPI、NCCL、Gloo)
  • 小巧(约 3MB),易用

7.2 核心用法

import horovod.torch as hvd
hvd.init()
optimizer = hvd.DistributedOptimizer(
    optimizer, named_parameters=model.named_parameters()
)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)

每个 worker 存完整模型副本,数据自动分片,AllReduce 自动执行。

7.3 启动命令

# 单机多 CPU
horovodrun -np 2 python train.py

# 使用 Gloo 后端(4 worker)
horovodrun --gloo -np 4 python test.py