第6章:分布式训练 — 复习资料
1. 并行计算基础
1.1 阿姆达尔定律
核心观点:仅提高并行处理效率是不够的,除非串行处理效率也得到提升。串行部分决定理论加速上限。
设串行部分占比 \(1-p\),可并行部分占比 \(p\),并行处理器数为 \(s\):
- 当 \(s \to \infty\) 时,\(S \to \frac{1}{1-p}\),加速比存在理论上限
- 例如 \(p=0.9\),即使无限处理器,加速比上限也仅 10 倍
- 适用范围:问题规模固定
1.2 古斯塔夫森定律
核心观点:并行度提升 → 问题规模可以扩大 → 可接近线性加速。
对比:
| 阿姆达尔 | 古斯塔夫森 | |
|---|---|---|
| 问题规模 | 固定 | 随并行度扩大 |
| 结论 | 加速比有上限 | 揭示了线性加速比的可能性 |
2. 深度学习串行训练回顾
2.1 TensorFlow 串行训练流程
标准深度学习串行训练的代码模式(以 TensorFlow 为例):
x = tf.placeholder(tf.float32)
W = tf.Variable(tf.float32)
b = tf.Variable(tf.float32)
m = W * x
s = m + b
y = tf.reduce_sum(s)
grad_W, grad_b = tf.gradients(y, [W, b])
update = optimizer.apply_gradients({[W, grad_W], [b, grad_b]})
对应数据流图:x → W → b → m(乘法) → s(加法) → y(求和) → 梯度反向传播 → 参数更新
2.2 深度学习训练的并行潜力
从数据流图中可以观察到天然并行性:
- 多个样本可并行:不同样本的前向/反向计算彼此独立
- 多个算子可并行:无数据依赖的操作可以同时执行
- 张量计算天然并行:卷积/矩阵乘内部有大量并行维度
这三类并行潜力对应了三种并行化方案。
3. 并行化基本方案
3.1 三类并行方案总览
| 方案 | 思想 | 粒度 |
|---|---|---|
| 算子内并行 | 并行单个张量计算子内的计算(利用 GPU 多处理单元并行) | 张量内部 |
| 算子间并行 — 数据并行 | 多个样本并行执行 | 样本级 |
| 算子间并行 — 模型并行 | 多个算子并行执行 | 算子级 |
| 组合并行 | 多种并行方案组合叠加 | 混合 |
3.2 算子内并行
思想:利用卷积/矩阵乘内部的数据并行性,在 Batch 维度、空间维度(H/W)、时间维度上切分,通过 SIMD 架构实现并行。
以 im2col 卷积为例:输入特征图展开为矩阵列,卷积核展开为矩阵行,转化为矩阵乘法后利用 GPU 多处理单元并行计算。展开过程涉及 \(N\)(batch)、\(C\)(通道)、\(H \times W\)(空间)、\(K\)(卷积核数)、\(R \times S\)(卷积核尺寸)等多维度并行。
3.3 模型并行
思想:将计算图划分到不同设备,每个设备负责网络的一部分层,跨设备传输中间激活值,各设备独立更新本地参数。
做法:以 ResNet50 为例,将前半部分层放到 GPU0,后半部分层放到 GPU1(Demo: twoGPU_res.py)。
class ModelParallelResNet50(nn.Module):
def __init__(self):
self.seq1 = nn.Sequential(...) # 放到 cuda:0
self.seq2 = nn.Sequential(...) # 放到 cuda:1
3.4 流水线并行
思想:将 mini-batch 进一步切分为多个 micro-batch(split),当前一个 split 进入 GPU1 时,下一个 split 立即进入 GPU0,形成流水线以消除 Bubble。
前馈运算示意图核心逻辑:
- GPU0 处理完 split0 的 FC0 后,立即将中间结果发给 GPU1 做 FC1
- 同时 GPU0 开始处理 split1 的 FC0,二者重叠执行
- 只有首尾几个 split 存在少量 Bubble(首部预热、尾部排空)
GPipe 算法:
- 拆分 micro-batch,流水线并行减少空闲
- 一个 batch 内所有 micro-batch 全部前向完成后,才统一执行反向传播
- Bubble 仍然较多:前向末尾和反向开头之间存在大片空闲
PipeDream 算法:
- 第一个 split 前向结束立即开始反向,不等整个 batch 完成
- 人为制造可控 bubble 让前向和反向在时间轴上重叠
- 比 GPipe 空闲更少,吞吐更高
GPipe vs PipeDream 对比:
| GPipe | PipeDream | |
|---|---|---|
| 反向时机 | 整个 batch 前向完成后统一反向 | 每个 split 前向完成立即反向 |
| Bubble 大小 | 较大 | 较小 |
| 实现复杂度 | 较低 | 较高 |
代码示例:Demo twoGPU_pp.py——仅首尾 iter 有空洞,GPU 可同时工作,split 不宜过多。
3.5 数据并行
思想:每个设备存放完整模型副本 → 数据均匀分片 → 各自独立前向/反向计算梯度 → AllReduce 聚合所有梯度(求平均) → 统一 SGD 更新参数。
核心步骤:
- 每个 worker 持有完整模型副本
- 数据按 worker 数量均匀分片(DistributedSampler)
- 各自计算局部梯度
- AllReduce 对所有 worker 的梯度求和/平均
- 各 worker 用聚合后的梯度统一更新参数
关键通信操作是 AllReduce——所有节点的梯度需要规约到每个节点。数据并行示意图中,GPU0 和 GPU1 各自的梯度 \(\nabla W\)、\(\nabla b\) 等全部通过 AllReduce 聚合。
4. 集合通信原语
4.1 六种通信原语详解
| 类型 | 原语 | 功能 | 典型用途 |
|---|---|---|---|
| 一对多 | Scatter | 将主节点的数据分片并分发至其他指定节点 | 数据分发 |
| 一对多 | Broadcast | 某个节点把自身数据发送到集群中所有其他节点 | 网络参数初始化 |
| 多对一 | Reduce | 规约运算:SUM/MIN/MAX/PROD/LOR 等,汇总到一节点 | 梯度汇总 |
| 多对一 | Gather | 将多个节点上的数据收集到单个节点(反向 Scatter) | 并行排序/搜索 |
| 多对多 | All-Gather | 收集所有数据到所有节点,不加工 = Gather + Broadcast | 全节点数据对齐 |
| 多对多 | All-Reduce | 在所有节点上都应用同样的 Reduce 操作 | 梯度同步(数据并行核心) |
All-Reduce 的两种实现方式:
- Reduce + Broadcast
- Reduce-Scatter + All-Gather(即 Ring AllReduce 方案)
4.2 PyTorch 通信原语接口
dist.broadcast(tensor, src) # 一对多广播
dist.reduce(tensor, dst, op) # 多对一规约
dist.all_reduce(tensor, op) # 多对多规约
dist.scatter(tensor, scatter_list, src) # 一对多分片
dist.gather(tensor, gather_list, dst) # 多对一收集
dist.all_gather(tensor_list, tensor) # 多对多全收集
dist.barrier() # 同步障
4.3 点对点通信
点对点通信是所有集合通信原语实现的基础:
| 方式 | API | 特点 | 通信与计算 |
|---|---|---|---|
| 同步/阻塞 | send / recv |
调用不返回,必须等待传输完成 | 不可重叠 |
| 异步/非阻塞 | isend / irecv + wait |
调用立即返回,用 wait 等待完成 |
可重叠 |
Ring AllReduce 中综合使用阻塞和非阻塞命令:优先接收数据,发送在总线空闲时进行,避免冲突。
5. AllReduce 实现算法
5.1 参数服务器架构
结构:CPU/GPU 做中心参数服务器存储全局参数,各 Worker 从服务器拉取参数 → 本地计算梯度 → 上传梯度到服务器 → 服务器更新参数。
致命缺点:中心节点通信带宽成为瓶颈——主节点通信量为 \(O(N \cdot K)\),随 worker 数量线性增长。
5.2 Reduce + Broadcast
流程:先由中心节点 Reduce 汇总所有节点的梯度 → 再 Broadcast 分发给所有节点。
耗时公式:\(2(\alpha + S/B) + NSC\)
缺点:主节点带宽瓶颈——所有数据都要经过中心节点,中心节点收发总量与其他节点不对称。
5.3 树形递归算法
流程:分层规约——第一层两两 Reduce → 第二层继续两两 Reduce → 逐级汇聚到根节点 → 再从根节点逐级向下广播。
优点:规避单节点瓶颈,耗时 \(O(\log N)\)。
缺点:部分节点(非叶子节点)带宽闲置;根节点仍承受最大负载。
5.4 Ring AllReduce
动机:针对 master 带宽瓶颈问题,采用环形拓扑通信链路。
环形拓扑:N 个 GPU 围成环形,每个节点只与左右两个邻居通信,无中心节点。
阶段一 — Scatter-Reduce(N-1 轮迭代):
核心机制:
- 数据分为 N 个等大 chunk
- 第 k 个 worker 把第 k 份数据发给下一个 worker,同时从前一个 worker 收到第 k-1 份数据
- 第 k 个 worker 把收到的第 k-1 份数据和自己的第 k-1 份数据累加整合,再将整合数据发给下一个 worker
- N-1 次迭代后完成 Scatter-Reduce:每个节点持有某一个 chunk 的完整全局和(chunk j 的全局和在 GPU j)
阶段二 — All-Gather(N-1 轮迭代):
- 每个节点将自己持有的完整 chunk(已规约好的)发给邻居
- 邻居收到后替换本地对应 chunk 并继续转发
- 再经过 N-1 次迭代,完成 All-Gather 操作
- 最终所有节点获得全部 chunk 的全局和(即完整 AllReduce 结果)
实现细节:
- 每个 worker 的通信总线上同时连接两个邻接节点
- 综合使用阻塞式及非阻塞点到点通信命令
- 优先接受数据,发送则在总线空闲时进行,避免冲突
- 代码:
ring-allreduce.py
性能分析:
整个过程中每个 worker 上 send 或 receive 的总通信数据量均为:
两条关键结论:
- 每个 worker 通信数据量近似独立于网络中 worker 的数量,为 \(O(K)\);而主从架构中主节点的通信数据量为 \(O(N \cdot K)\)
- 每个 worker 的网络收发负载是均衡的,网络双向带宽得到充分利用
6. 分布式 SGD
6.1 同步 SGD
流程:各节点算梯度 → AllReduce 平均 → 统一更新 → Barrier 等待所有节点完成。
时间轴特征(3 台机器为例):
Machine 1: [Compute][Communicate] -----Waste----- [Compute]...
Machine 2: [Compute][Communicate] -----Waste----- [Compute]...
Machine 3: [Compute][Communicate] Barrier Waste [Compute]...
每个 iter 之间都存在 Barrier 造成的 Waste(空闲等待),慢节点拖累全局。
特点:数学上等价于单机大 batch SGD(梯度一致),收敛有保证;但 Barrier 造成浪费。
6.2 异步 SGD
流程:各节点独立算梯度、独立更新参数,无 Barrier 等待。
特点:速度快、无 Barrier 浪费;但梯度可能"过时"(stale gradient——节点在用旧参数算梯度时,其他节点已更新了参数),收敛性理论保证较弱。
6.3 数据并行 vs 模型并行 vs 非并行
| 维度 | 非并行 | 数据并行 | 模型并行 |
|---|---|---|---|
| 样本数据量 | 1 | 1/N | 1 |
| 传输数据量 | 0 | 模型大小(梯度) | 激活大小 |
| 存储占用 | 1 | N(每节点存完整模型) | 1(每节点存部分) |
| 负载平衡度 | — | 强 | 弱 |
| 并行限制 | — | 单步样本量(batch size) | 算子数量(层间依赖) |
7. Horovod
7.1 简介
Uber 开源的分布式训练框架,支持 TensorFlow / PyTorch / Keras / MXNet。
核心特点:
- 不依赖
torch.distributed,底层通信透明 - 同步数据并行,用
hvd.DistributedOptimizer包装优化器即可自动梯度 AllReduce - 支持梯度压缩(FP16)减少通信量
- 多后端兼容(MPI、NCCL、Gloo)
- 小巧(约 3MB),易用
7.2 核心用法
import horovod.torch as hvd
hvd.init()
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
每个 worker 存完整模型副本,数据自动分片,AllReduce 自动执行。
7.3 启动命令
# 单机多 CPU
horovodrun -np 2 python train.py
# 使用 Gloo 后端(4 worker)
horovodrun --gloo -np 4 python test.py