【Picotron-Tutorial】流水线并行

Afab并行

理论分析

最简单的pipeline并行就是将模型划分为好几层,然后分别放置在不同的GPU上依次进行前向传播和后先传播,如下图所示。但是这带来的最大的问题是效率过低,存在很多空闲时刻。

一个16层模型的流水线并行示例,该模型分布在4块GPU上。数字表示层编号。

假设$t_f$ 和$t_b$ 分别是单个微批次在流水线的一个阶段上进行前向传播和反向传播所需的时间(通常假设 $t_b\approx2\times t_f$,这在上图中可以观察到)。如果我们能够完美并行化,理想总时间应为 $t =t_b+t_f$。但由于流水线气泡的存在,额外的时间为$t_p =(p-1)*(t_b+t_f)$其中$p$ 是流水线并行度,即上图中的GPU数量),即每个GPU在其他GPU计算时的等待时间。

因此我们可以计算额外气泡时间与理想时间的比值:

$$r_{bubble}=\frac{(p-1)*(t_f+t_b)}{t_f+t_b}=p-1$$

当我们增加流水线阶段数时,气泡时间随之增加,GPU利用率下降。可以看出,在一个简单的实现中,流水线气泡可能会非常大!

为此需要提出一些优化方法来减少流水线中的气泡。一个经典的方法就是全前向-全反向(AFAB, All-Forward-All-Backward)调度。其整体思路是与微批次(microbatches)相绑定的。在微批次中,我们需要先对微批次中的所有的样本进行前向传播和反向传播,得到梯度后对各样本的梯度进行平均,然后通过优化器更新参数。

其优势在于前向和反向传播仍然是严格顺序的,因此可以保持模型训练代码的整体组织,使这种流水线并行实现方式成为最容易实现的一种。

在之前的图表中,数字代表的是模型的层数,而从这一张图开始,所有流水线并行相关的图表中的数字都表示微批次。你可以将每个方块理解为包含多个层,就像前一张图所示的那样。

在如此设计下,理想情况下处理m个批次所需要的时间为 $t_{id}=m \times (t_f + t_b)$,实际过程中产生的气泡依旧为$(p-1)*(t_b+t_f)$,所以气泡比例为:

$$r_{bubble}=\frac{(p-1)*(t_f+t_b)}{m\times ({t_f+t_b})}=p-1$$

可以看到我们可以通过增加微批次的方法来减少气泡。

代码实现

1. pipeline_communicate

实现了流水线并行中各个阶段之间点对点的发送和接收张量(激活值或梯度)的通用接口。

  • 其需要输入一个operation参数来表示操作的类型,包括有:

    • recv_forward:接受前向传播的数据,如果是流水线第一个阶段,就不需要接受,直接返回none。

    • send_forward:发送前向传播的数据,如果是流水线最后一个阶段,就不需要发送,直接返回none。

    • recv_backward:

    • send_backward

  • 通过operation可以得到目标操作对象,然后就创建异步的点对点通信

  • 等待通信操作完成

  • 如果是recv操作,就需要返回接收到的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# ... existing code ...
STEP, VERBOSE = 0, os.environ.get("VERBOSE", "0") == "1"
def pipeline_communicate(operation, device, dtype, tensor=None, shapes=None):
"""
处理流水线阶段之间用于前向和反向传播的点对点通信。

Args:
operation (str): 通信操作的类型 ('recv_forward', 'send_forward',
'recv_backward', 'send_backward')
device: 张量操作的目标设备 (例如, CPU, GPU)
dtype: 张量的数据类型
tensor: 用于发送操作的输入张量 (默认: None)
shapes: 用于接收张量的形状规格 (默认: None)

Returns:
torch.Tensor or None: 接收操作返回接收到的张量,发送操作返回 None
"""
global STEP # 用于调试,追踪通信步骤
global VERBOSE # 控制是否打印详细日志

# --- 根据操作类型确定源/目标和行为 ---
if operation == 'recv_forward':
# 如果是第一个流水线阶段,则在前向传播中无需接收
if pgm.process_group_manager.pp_is_first_stage: return None
# 创建一个空的张量用于接收数据,requires_grad=True 因为它将是后续计算图的一部分
tensor = torch.empty(shapes, requires_grad=True, device=device, dtype=dtype)
src = pgm.process_group_manager.pp_prev_rank # 从前一个流水线阶段接收

elif operation == 'send_forward':
# 如果是最后一个流水线阶段,则在前向传播中无需发送
if pgm.process_group_manager.pp_is_last_stage: return
dest = pgm.process_group_manager.pp_next_rank # 发送到下一个流水线阶段

elif operation == 'recv_backward':
# 如果是最后一个流水线阶段,则在反向传播中无需接收梯度
if pgm.process_group_manager.pp_is_last_stage: return None
tensor = torch.empty(shapes, requires_grad=True, device=device, dtype=dtype) # 接收的梯度也需要梯度
src = pgm.process_group_manager.pp_next_rank # 从下一个流水线阶段接收梯度

elif operation == 'send_backward':
# 如果是第一个流水线阶段,则在反向传播中无需发送梯度
if pgm.process_group_manager.pp_is_first_stage: return
dest = pgm.process_group_manager.pp_prev_rank # 将梯度发送到前一个流水线阶段

# --- 执行通信 ---
is_send = operation.startswith('send')
peer_rank = dest if is_send else src # 确定通信对方的 rank

# 创建点对点操作 (P2POp) 对象,使用异步发送 (isend) 或接收 (irecv)
op = dist.P2POp(dist.isend if is_send else dist.irecv, tensor, peer_rank)

if VERBOSE:
print(f"{operation} | {'sending' if is_send else 'receiving'} {operation.split('_')[1]} "
f"{pgm.process_group_manager.pp_rank} {'→' if is_send else '←'} {peer_rank} | "
f"STEP:{STEP} | RANK:{pgm.process_group_manager.pp_rank}", flush=True)

# 执行批处理的异步发送/接收操作,并等待其完成
# 注意: dist.batch_isend_irecv 通常用于多个 P2POp,这里只用了一个
[req.wait() for req in dist.batch_isend_irecv([op])]
torch.cuda.synchronize() # 确保 CUDA 操作完成

if VERBOSE: STEP += 1

return tensor if not is_send else None # 如果是接收操作,返回接收到的张量
# ... existing code ...

2. PipelineParallel

  1. 在初始化时,会将decoder_layers进行划分,根据pp分离的数量得到每个gpu负责多少layers,然后得到每个gpu负责的layer的起始与结束的decoder layer。还会看当前进程是否是pipeline的第一个进程,如果是就包含embedding层,不然就使用nn.Identity()。nn.Identity()的作用是将输入原封不动地返回。还会看如果当前进程时pipeline的最后一个进程,那么就会加上final_norm和final_proj,不然也是用nn.Identity()代替。

  2. 在forward的时候,其输入有input、position_ids和hidden_states。如果hidden_states不为空,那么输入就是input_ids,不然输入就是hidden_states,然后对输入使用embedding,因为如果不是第一个pipeline,embedding都是nn.Identity(),所以是可以的。然后执行自己所属的decoder_layers,然后在执行final_norma和final_proj。

  3. 在backward的时候,输入是input_tensor(当前阶段输入的tensor)、output_tensor(当前阶段输出的tensor)和output_tensor_grad(关于output_tensor的梯度)

    1. 首先需要设置input_tensor需要保存梯度,需要额外设置的原因在于原本非叶子结点是不保存的梯度的,但是现在模型进行了拆分,所以也需要保存。

    2. 然后需要查看output_tensor_grad是不是None,如果是None那就说明现在其实是在最后一个阶段,那就需要将其初始化为1

    3. 然后执行反向传播计算当前阶段的梯度。

    4. 然后如果不是第一个阶段,就继续往前传递梯度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# ... existing code ...
class PipelineParallel(nn.Module):
def __init__(self, model, config):
super().__init__()
# 1. 分配层到当前 GPU
layer_distribution = self.distribute_layers(config.num_hidden_layers)

# 2. 根据当前 GPU 是否为第一个/最后一个阶段,选择性地包含 embedding, final_norm, final_proj
# 如果不是对应阶段,则使用 nn.Identity() 作为占位符(无操作)
self.embedding = model.embedding if pgm.process_group_manager.pp_is_first_stage else nn.Identity()
# 只保留分配给当前 GPU 的 decoder_layers
self.decoder_layers = nn.ModuleDict({str(i): model.decoder_layers[i] for i in layer_distribution})
self.final_norm = model.final_norm if pgm.process_group_manager.pp_is_last_stage else nn.Identity()
self.final_proj = model.final_proj if pgm.process_group_manager.pp_is_last_stage else nn.Identity()

def distribute_layers(self, num_layers):
"""计算当前流水线阶段应该负责哪些层。"""
# 计算每个 GPU 应分配的层数,处理余数
layers_per_gpu = [num_layers // pgm.process_group_manager.pp_world_size + (1 if i < num_layers % pgm.process_group_manager.pp_world_size else 0) for i in range(pgm.process_group_manager.pp_world_size)]
# 计算当前 GPU 的起始层索引
start_layer = sum(layers_per_gpu[:pgm.process_group_manager.pp_rank])
# 返回当前 GPU 负责的层索引列表
return list(range(start_layer, start_layer + layers_per_gpu[pgm.process_group_manager.pp_rank]))

def forward(self, input_ids, position_ids, hidden_states):
"""当前流水线阶段的前向传播。"""
# 如果 hidden_states 不为 None (来自前一阶段的激活),则使用它;否则使用 input_ids (第一阶段)
x = hidden_states if hidden_states is not None else input_ids
x = self.embedding(x) # 应用 embedding (如果存在)
for layer in self.decoder_layers.values(): # 通过分配给此阶段的 decoder 层
x = layer(x, position_ids=position_ids)
x = self.final_norm(x) # 应用 final_norm (如果存在)
return self.final_proj(x) # 应用 final_proj (如果存在) 并返回输出

def backward(self, input_tensor, output_tensor, output_tensor_grad):
"""当前流水线阶段的反向传播。"""
# input_tensor: 当前阶段前向传播的输入激活
# output_tensor: 当前阶段前向传播的输出激活
# output_tensor_grad: 相对于 output_tensor 的梯度 (来自下一阶段或损失函数)

if input_tensor is not None:
input_tensor.retain_grad() # 确保 input_tensor 的梯度会被计算并存储

# 如果是最后一个阶段,且没有显式提供 output_tensor_grad,则初始化为全1张量
if output_tensor_grad is None:
output_tensor_grad = torch.ones_like(output_tensor, memory_format=torch.preserve_format)

# 执行反向传播,计算当前阶段参数的梯度,以及相对于 input_tensor 的梯度
torch.autograd.backward(output_tensor, grad_tensors=output_tensor_grad, retain_graph=False, create_graph=False)

# 返回相对于 input_tensor 的梯度,这个梯度将传递给前一个阶段
return input_tensor.grad if input_tensor is not None else None
# ... existing code ...

3. train_step_pipeline_afab

这个函数实现了使用 AFAB(Activation Forward - Activation Backward)流水线调度策略的一个完整训练步骤。AFAB 策略意味着所有微批次首先完成完整的前向传播阶段,然后所有微批次再完成完整的反向传播阶段。

在前向传播阶段:

  1. 从前一个阶段接收前向传播的激活值。这里对于第一个阶段会直接返回None,对于其他阶段会阻塞,直到接收到了数据为止。

  2. 从data_loader获取当前微批次的数据。如果不是第一阶段获取到了前面的前向传播的激活值就放入batch["hidden_states"]

  3. 执行当前阶段的前向传播

  4. 将当前阶段的结果发送给下一个阶段,注意如果是最后一个阶段的话会直接返回None

  5. 如果是最后一个阶段,那么就根据输出和获取到的batch["target_ids"]计算损失,注意需要将得到的loss除以data_loader.grad_acc_steps,以做到正确的累积。

  6. 存储当前微批次的输入和输出到数组中

在反向传播阶段:

  1. 如果启用了数据并行,那么需要在最后一个mini_batch中设置model.require_backward_grad_sync=true,从而使得模型在反向传播的时候会与其他的数据并行的模型的梯度进行平均

  2. 等待后一个阶段的模型的反向传播的输出梯度,这里对于最后一个阶段的进程会直接返回1,但是对于其他的进程一开始会有阻塞。

  3. 弹出当前微批次在前向传播时保存的输入和输出激活

  4. 调用PipelineParallel的backward计算反向传播的梯度

  5. 将梯度传输给下一个阶段

  6. 最后返回平均的损失(打印logger用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# ... existing code ...
def train_step_pipeline_afab(model, data_loader, tensor_shapes, device, dtype):
"""
使用 AFAB 流水线并行执行一个训练步骤。
实现分离的前向和反向传播阶段以优化内存使用。
"""
logging_loss: torch.float32 = 0.0 # 用于记录损失
input_tensors, output_tensors = [], [] # 存储每个微批次在前向传播中的输入和输出激活,供反向传播使用
requires_grad_sync = pgm.process_group_manager.dp_world_size > 1 # 是否需要数据并行梯度同步

# === 全局前向传播阶段 ===
# 对梯度累积的每一步(即每个微批次)
for _ in range(data_loader.grad_acc_steps):
# 1. 从前一个阶段接收前向传播的激活值 (如果不是第一阶段)
input_tensor = pipeline_communicate(operation='recv_forward', shapes=tensor_shapes, device=device, dtype=dtype)

# 2. 获取当前微批次的数据
batch = next(data_loader)
# 将接收到的 input_tensor (如果存在) 设置为当前批次的 hidden_states
batch["hidden_states"] = input_tensor.to(device) if input_tensor is not None else input_tensor

# 3. 执行当前阶段的前向传播
output_tensor = model.forward(input_ids=batch["input_ids"].to(device), position_ids=batch["position_ids"].to(device), hidden_states=batch["hidden_states"])

# 4. 将当前阶段的输出激活发送到下一个阶段 (如果不是最后阶段)
pipeline_communicate(operation='send_forward', tensor=output_tensor, device=device, dtype=dtype)

# 5. 如果是最后一个阶段,计算损失
if pgm.process_group_manager.pp_is_last_stage:
# 注意: 这里的 output_tensor 是模型的 logits 输出
# F.cross_entropy 期望的输入形状是 (N, C, d1, ..., dk) 或 (N, C),目标是 (N, d1, ..., dk) 或 (N)
# 通常模型的输出是 (batch_size, seq_len, vocab_size),所以需要 transpose(1, 2) 变为 (batch_size, vocab_size, seq_len)
loss_val = F.cross_entropy(output_tensor.transpose(1, 2), batch["target_ids"].to(device), reduction='mean')
logging_loss += loss_val.item() / data_loader.grad_acc_steps # 累积并平均损失

# 6. 存储当前微批次的输入和输出激活,供反向传播使用
input_tensors.append(input_tensor)
output_tensors.append(output_tensor) # 注意:对于最后一个阶段,这里存的是损失值 (标量张量)

# === 全局反向传播阶段 ===
# 按照前向传播的顺序,对每个微批次进行反向传播
for ith_microbatch in range(data_loader.grad_acc_steps):
# 如果启用了数据并行,则只在梯度累积的最后一步同步梯度
if requires_grad_sync:
is_last_iteration = (ith_microbatch == data_loader.grad_acc_steps - 1)
model.require_backward_grad_sync = is_last_iteration # 假设 model 有这个属性来控制DP同步

# 1. 从下一个阶段接收反向传播的梯度 (如果不是最后阶段)
output_tensor_grad = pipeline_communicate(operation='recv_backward', shapes=tensor_shapes, device=device, dtype=dtype)

# 2. 弹出当前微批次在前向传播时保存的输入和输出激活
input_tensor, output_tensor = input_tensors.pop(0), output_tensors.pop(0)

# 3. 执行当前阶段的反向传播
# 对于最后一个阶段,output_tensor 是损失值,output_tensor_grad 应该是 None (或由 pipeline_communicate 返回 None)
# model.backward 会处理 output_tensor_grad 为 None 的情况 (通常初始化为 torch.ones_like)
input_tensor_grad = model.backward(input_tensor, output_tensor, output_tensor_grad)

# 4. 将计算得到的输入梯度发送到前一个阶段 (如果不是第一阶段)
pipeline_communicate(operation='send_backward', tensor=input_tensor_grad, device=device, dtype=dtype)

return logging_loss # 返回平均损失
# ... existing code ...

1f1b并行

理论分析

该调度方案称为 一前一后(1F1B),因为在中间/稳定状态下,交替执行一个前向传播和一个反向传播。其基本思想是尽早开始反向传播。该调度如下所示:

如果仔细计算的话会发现气泡的大小仍然相同,因此训练效率并未显著提升。然而,我们仅需存储 $p$ 个微批次的激活(其中 $p$ 为流水线并行度),而不是 $m$(其中 $m$ 是微批次数量)。这主要是因为观察最后一个阶段的GPU4,如果要做到1f1b的调度,那么在微批次1的梯度返回给第一个GPU的时候,第 $p=4$ 个微批次的激活就应该已经到达了GPU4,所以在此之前对于GPU1必然已经计算完了前$p=4$ 个微批次的激活。这减少了在 AFAB 调度中遇到的激活内存爆炸问题。因此,我们可以增加更多微批次,从而实际减少气泡的影响。

这种设置的主要复杂性(如上图所示)在于前向和反向传播不再是完全顺序执行的,各个GPU执行是并行交错执行的,每个GPU有自己调度的节奏,而不再能用同一套代码来统一控制了。这也是流水线并行通常需要对训练代码和建模代码进行大幅修改的原因之一。

代码分析

1. bidirectional_pipeline_communicate

该函数的作用是 处理一个流水线阶段同时向其邻居阶段发送数据和从该邻居阶段接收数据的双向通信 。

  1. 支持两种操作:

    • send_fwd_recv_bwd:发送前向传播接收反向传播的数据

    • send_bwd_recv_fwd:发送反向传播并接收前向传播的数据

  2. 处理边界条件,如果是第一层就不需要执行send_bwd_recv_fwd,如果是最后一层就不需要执行send_fwd_recv_bwd

  3. 确定通信的对象,如果是send_fwd_recv_bwd,那么对象就是下一个流水线的GPU,不然就是上一个

  4. 创建一个空的张量用于接收数据

  5. 设置并启动同时进行的异步发送和异步接收操作

  6. 两个 P2POp 对象:一个用于发送,一个用于接收。异步执行

  7. 等待两个异步执行操作结束

  8. 返回接收到的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# ... existing code ...
def bidirectional_pipeline_communicate(operation, send_tensor, recv_shapes, device, dtype):
"""
处理流水线阶段之间的双向通信,允许同时进行发送和接收操作。

Args:
operation (str): 双向操作的类型 ('send_fwd_recv_bwd' 或 'send_bwd_recv_fwd')
send_tensor: 要发送的张量
recv_shapes: 要接收的张量的形状规格
device: 张量操作的目标设备
dtype: 张量的数据类型

Returns:
torch.Tensor or None: 接收到的张量,如果在流水线的终端阶段则为 None
"""
global STEP # 用于调试,追踪通信步骤
global VERBOSE # 控制是否打印详细日志

# 1. 判断操作的主要方向 (是发送前向激活还是发送反向梯度)
is_fwd = (operation == 'send_fwd_recv_bwd')

# 2. 处理流水线的边界情况 (第一个或最后一个阶段)
# 如果是 'send_fwd_recv_bwd' (发送前向,接收反向) 且当前是最后一个阶段,
# 则没有下一个阶段可以发送前向激活,也没有下一个阶段可以接收反向梯度。
if (is_fwd and pgm.process_group_manager.pp_is_last_stage) or \
(not is_fwd and pgm.process_group_manager.pp_is_first_stage):
# 如果是 'send_bwd_recv_fwd' (发送反向,接收前向) 且当前是第一个阶段,
# 则没有前一个阶段可以发送反向梯度,也没有前一个阶段可以接收前向激活。
return None # 在这些边界情况下,不进行通信,直接返回 None

# 3. 确定通信对方的 rank (peer_rank)
# 如果是发送前向 (is_fwd is True),则对方是下一个流水线阶段 (pp_next_rank)
# 如果是发送反向 (is_fwd is False),则对方是上一个流水线阶段 (pp_prev_rank)
peer_rank = pgm.process_group_manager.pp_next_rank if is_fwd else pgm.process_group_manager.pp_prev_rank

# 4. 创建一个空的张量用于接收数据
# requires_grad=True 是因为接收到的张量 (无论是前向激活还是反向梯度) 都将参与后续的梯度计算
recv_tensor = torch.empty(recv_shapes, requires_grad=True, device=device, dtype=dtype)

# 5. 设置并启动同时进行的异步发送和异步接收操作
# dist.batch_isend_irecv 接收一个 P2POp (点对点操作) 列表
# 这里我们创建了两个 P2POp 对象:一个用于发送,一个用于接收,都与同一个 peer_rank 通信
reqs = dist.batch_isend_irecv([
dist.P2POp(dist.isend, send_tensor, peer_rank), # 异步发送 send_tensor 给 peer_rank
dist.P2POp(dist.irecv, recv_tensor, peer_rank) # 从 peer_rank 异步接收数据到 recv_tensor
])

if VERBOSE:
# 打印详细的通信日志
print(f"{operation} | sending {'next' if is_fwd else 'prev'} "
f"{pgm.process_group_manager.pp_rank} -> {peer_rank} | "
f"receiving {'next' if is_fwd else 'prev'} {peer_rank} -> "
f"{pgm.process_group_manager.pp_rank} | STEP {STEP=} | "
f"RANK:{pgm.process_group_manager.pp_rank}", flush=True)

# 6. 等待发送和接收操作都完成
# [req.wait() for req in reqs] 会阻塞当前进程,直到 reqs 列表中的所有请求 (即发送和接收) 都完成
[req.wait() for req in reqs]
torch.cuda.synchronize() # 确保所有在默认 CUDA 流上的操作完成,特别是在 GPU 通信后

if VERBOSE: STEP += 1 # 增加调试步骤计数器

# 7. 返回接收到的张量
return recv_tensor
# ... existing code ...

2. train_step_pipeline_1f1b

该函数实现了1f1b的调度策略,其旨在通过让每个流水线阶段(GPU)在稳定期同时处理一个微批次(micro-batch)的前向传播和另一个微批次的反向传播,并重叠计算与通信,来提高 GPU 的利用率并减少流水线“气泡”(即 GPU 空闲时间)。

整个函数可以分为三个主要阶段:

  1. 预热(Warmup)阶段 :用前向传播任务填充流水线。

  2. 稳定(Steady State)阶段 :核心的 1F1B 操作,每个阶段同时执行一个前向和一个反向传播。

  3. 冷却(Cooldown)阶段 :清空流水线中剩余的反向传播任务。

初始化阶段:

我们假设如上图所示,GPU数量为4,pp_world_size为4,grad_acc_steps=8。

  1. 计算预热阶段的微批次的数量:** **num_warmup_microbatches = min(pgm.process_group_manager.pp_world_size - pgm.process_group_manager.pp_rank - 1, data_loader.grad_acc_steps)。如果pp_rank是0,那么其对应的num_warmup_microbatches为3,如果pp_rank是3,那么对应的num_warmup_microbatches为0。

  2. 计算在预热阶段之后,需要在稳定状态下处理的微批次数:num_microbatches_remaining = data_loader.grad_acc_steps - num_warmup_microbatches

  3. 判断是否需要数据并行, 如果需要就标记requires_grad_sync=true

预热阶段:

预热阶段整体与Afab中的实现前向传播类似。

会执行num_warmup_microbatches次下面的操作来进行预热:

  1. 通过recv_forward操作获取前一个阶段的input,注意对于第一阶段的pipeline,这会直接返回none,对于其他阶段会进行阻塞等待。

  2. 执行自定义的_forward_step函数,得倒output

    1. 通过data_loader获取下一个batch

    2. 将input作为batch[“hidden_states”]

    3. 执行PipelineParallel的forward

    4. 如果是pipeline的最后一个阶段还需要计算loss并将loss作为output_tensor

  3. 通过send_forward操作来发送输出

  4. 将input和output添加到数组中

如果num_microbatches_remaining>0,即稳定状态下需要微批次那么就再执行recv_forward操作来收集input,这是作为稳定状态循环的开始,一般都需要

如果启用了数据并行,那么还需要先设置require_backward_grad_sync为false,防止还没都最后一个梯度计算就开始执行了梯度平均。

稳定阶段:

在稳定阶段,每个GPU同时执行一个前向传播和反向传播

  1. 执行刚刚最后收集到的input的前向传播_forward_step

  2. 执行send_fwd_recv_bwd操作发送前向传播的结果收集反向传播的梯度

  3. 将当前的input和output都放入到数组中

  4. 此时数组中的第一个input和第一个output是一对,直接取出

  5. 如果是在最后一个流水线阶段并且是稳定状态的最后一次迭代,那么就说明所有的微批次的反向传播在最后一个阶段都计算完了,设置require_backward_grad_sync为true

  6. 执行model.backward来计算反向传播

  7. 如果是流水线的最后一个阶段就执行send_backward发送回梯度,不然就执行send_bwd_recv_fwd,发送梯度,等待下一轮的激活值

冷却阶段:

在冷却阶段需要把还剩余的梯度处理掉,因为之前预热阶段对num_warmup_microbatches批次只处理了前向传播,没有处理反向传播,所以这里需要把之前欠缺的反向传播补足。它会遍历需要的反向传播的次数:

  1. 如果是最后一次迭代,就设置require_backward_grad_sync=true,使得该批次的梯度可以得到平均

  2. 从数组中弹出input和output

  3. 调用recv_backward来接收上一个阶段的梯度

  4. 反向传播计算梯度

  5. 将计算得到梯度通过调用send_backward来将梯度传给前一个阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def train_step_pipeline_1f1b(model, data_loader, tensor_shapes, device, dtype):    
# 1. 初始化和计算微批次数量
# num_warmup_microbatches: 当前 rank 需要执行的纯前向传播的微批次数,以启动流水线。
# - 对于 rank 0,它是 pp_world_size - 1 (如果梯度累积步数足够多)。
# - 对于最后一个 rank (pp_world_size - 1),它是 0。
# - 这是为了确保在第一个反向传播开始前,流水线被适当地“填充”。
num_warmup_microbatches = min(pgm.process_group_manager.pp_world_size - pgm.process_group_manager.pp_rank - 1, data_loader.grad_acc_steps)

# num_microbatches_remaining: 在预热阶段之后,需要在稳定状态下处理的微批次数。
num_microbatches_remaining = data_loader.grad_acc_steps - num_warmup_microbatches

logging_loss, input_tensors, output_tensors = 0.0, [], [] # 用于记录损失和存储中间的输入/输出张量
requires_grad_sync = pgm.process_group_manager.dp_world_size > 1 # 判断是否需要数据并行梯度同步

# 2. 定义内部辅助函数 _forward_step
def _forward_step(input_tensor):
batch = next(data_loader) # 获取下一个微批次数据
# 如果 input_tensor 不是 None (即不是第一个流水线阶段的第一次迭代),则将其作为隐藏状态
batch["hidden_states"] = input_tensor.to(device) if input_tensor is not None else input_tensor
# 执行模型的前向传播
output_tensor = model.forward(input_ids=batch["input_ids"].to(device), position_ids=batch["position_ids"].to(device), hidden_states=batch["hidden_states"])

# 如果是最后一个流水线阶段,计算损失
if pgm.process_group_manager.pp_is_last_stage:
# output_tensor 此时是 logits,需要与 target_ids 计算交叉熵损失
output_tensor = F.cross_entropy(output_tensor.transpose(1, 2), batch["target_ids"].to(device), reduction='mean')
nonlocal logging_loss # 允许修改外部作用域的 logging_loss
logging_loss += output_tensor.item() / data_loader.grad_acc_steps # 累积损失
return output_tensor # 返回当前阶段的输出张量 (对于最后一个阶段,这是损失值)

# === 预热(Warmup)前向传播阶段 === (L214-L219)
# 这个循环的目的是填充流水线。每个阶段执行其分配到的 num_warmup_microbatches 次前向传播。
for _ in range(num_warmup_microbatches):
# 从前一个阶段接收前向传播的激活值 (或 None,如果是第一个阶段)
input_tensor = pipeline_communicate(operation='recv_forward', shapes=tensor_shapes, device=device, dtype=dtype)
# 执行当前微批次的前向计算
output_tensor = _forward_step(input_tensor)
# 将计算得到的激活值发送给下一个阶段 (或 None,如果是最后一个阶段)
pipeline_communicate(operation='send_forward', tensor=output_tensor, device=device, dtype=dtype)
# 存储当前微批次的输入和输出,供后续的反向传播使用
input_tensors.append(input_tensor)
output_tensors.append(output_tensor)

# 在进入稳定状态之前,如果还有剩余的微批次需要处理,
# 当前阶段需要先接收一个前向传播的输入。
# 这是因为稳定状态的循环开始时,期望 input_tensor 已经被填充。
if num_microbatches_remaining > 0: # (L221-L222)
input_tensor = pipeline_communicate(operation='recv_forward', shapes=tensor_shapes, device=device, dtype=dtype)

# 如果启用了数据并行 (dp_world_size > 1),则需要处理梯度的同步。
# 在进入稳定状态前,通常先禁用梯度同步,因为梯度是在所有微批次的反向传播完成后才同步的。
if requires_grad_sync: # (L224-L225)
model.require_backward_grad_sync = False

# === 1F1B 稳定状态(Steady State)阶段 === (L228-L247)
# 在这个阶段,每个 GPU 同时执行一个前向传播和一个反向传播(针对不同的微批次)。
for ith_microbatch in range(num_microbatches_remaining):
is_last_iteration = (ith_microbatch == num_microbatches_remaining - 1) # 判断是否是稳定状态的最后一次迭代

# 1. 执行当前微批次的前向传播 (F)
output_tensor = _forward_step(input_tensor) # 使用上一次迭代接收或预热阶段最后接收的 input_tensor

# 2. 双向通信:发送当前前向结果 (F),接收上一个微批次的反向梯度 (B)
# - send_tensor=output_tensor: 将当前前向计算的结果发送给下一个阶段。
# - recv_shapes=tensor_shapes: 准备接收从下一个阶段传来的梯度。
# 这个梯度对应的是上一个已完成前向传播并发送出去的微批次的输出。
output_tensor_grad = bidirectional_pipeline_communicate(operation='send_fwd_recv_bwd', send_tensor=output_tensor, recv_shapes=tensor_shapes, device=device, dtype=dtype)

# 存储当前完成前向传播的 input_tensor 和 output_tensor
input_tensors.append(input_tensor)
output_tensors.append(output_tensor)

# 取出之前存储的、现在需要进行反向传播的微批次的 input_tensor 和 output_tensor
# 这些是与刚接收到的 output_tensor_grad 相对应的。
input_tensor, output_tensor = input_tensors.pop(0), output_tensors.pop(0)

# 梯度同步控制:
# - 只在最后一个流水线阶段 (num_warmup_microbatches == 0)
# - 并且是稳定状态的最后一次迭代时
# - 才将 model.require_backward_grad_sync 设置为 True。
# 这是为了确保在整个梯度累积周期的最后一次反向传播时进行梯度同步。
if num_warmup_microbatches == 0 and is_last_iteration: # (L238-L239)
model.require_backward_grad_sync = True

# 3. 执行上一个微批次的反向传播 (B)
input_tensor_grad = model.backward(input_tensor, output_tensor, output_tensor_grad)

# 4. 处理反向传播结果的通信
if is_last_iteration: # 如果是稳定状态的最后一次迭代
# 对于最后一个1F1B周期的反向传播,不再需要接收新的前向输入,
# 只需将计算得到的梯度发送给前一个阶段。
input_tensor = None # 后续不再有前向传播,所以设为 None
pipeline_communicate(operation='send_backward', tensor=input_tensor_grad, device=device, dtype=dtype)
else: # 如果不是稳定状态的最后一次迭代
# 双向通信:发送当前反向计算得到的梯度 (B),接收下一个微批次的前向输入 (F)
# - send_tensor=input_tensor_grad: 将当前反向计算得到的输入梯度发送给前一个阶段。
# - recv_shapes=tensor_shapes: 准备从前一个阶段接收下一个微批次的前向激活值。
# 这个接收到的 input_tensor 将用于下一次 1F1B 循环的 _forward_step。
input_tensor = bidirectional_pipeline_communicate(operation='send_bwd_recv_fwd', send_tensor=input_tensor_grad, recv_shapes=tensor_shapes, device=device, dtype=dtype)

# === 冷却(Cooldown)反向传播阶段 === (L250-L258)
# 这个循环处理在预热阶段存入流水线、但在稳定状态未来得及进行反向传播的微批次。
# 循环次数等于预热阶段的微批次数。
for ith_warmup_microbatches in range(num_warmup_microbatches):
if requires_grad_sync:
# 梯度同步控制:只在整个梯度累积周期的最后一次反向传播时才同步。
# 这通常发生在冷却阶段的最后一次迭代,对于那些 num_warmup_microbatches > 0 的 rank。
model.require_backward_grad_sync = (ith_warmup_microbatches == num_warmup_microbatches - 1)

# 取出预热阶段存储的输入和输出张量
input_tensor, output_tensor = input_tensors.pop(0), output_tensors.pop(0)
# 从下一个阶段接收反向梯度
output_tensor_grad = pipeline_communicate(operation='recv_backward', shapes=tensor_shapes, device=device, dtype=dtype)
# 执行反向传播
input_tensor_grad = model.backward(input_tensor, output_tensor, output_tensor_grad)
# 将计算得到的输入梯度发送给前一个阶段
pipeline_communicate(operation='send_backward', tensor=input_tensor_grad, device=device, dtype=dtype)

return logging_loss # 返回累积的损失值

【Picotron-Tutorial】流水线并行
http://example.com/2025/06/24/Picotron-Tutorial pipeline parallel/
作者
滑滑蛋
发布于
2025年6月24日
许可协议