DPO/GRPO 代码填空 (Level 2-3)
本练习覆盖偏好对齐训练的两大核心算法:DPO(Direct Preference Optimization)和 GRPO(Group Relative Policy Optimization)。从概念理解到完整实现,掌握 RLHF 的关键技术。
前置知识
- 强化学习基础(策略、奖励、KL 散度)
- Bradley-Terry 偏好模型
- PyTorch 基础(log_softmax、gather、sigmoid)
核心公式
DPO Loss:
GRPO Loss:
练习 1:Bradley-Terry 模型概念题(Level 1)
题目 1:在 Bradley-Terry 偏好模型中,给定两个回答
其中
- A) 0.50
- B) 0.73
- C) 0.95
- D) 0.99
点击查看答案
答案:C
Bradley-Terry 模型的核心思想是:两个选项被偏好的概率由它们 reward 之差的 sigmoid 决定。差距越大,偏好越确定。
题目 2:DPO 的核心创新在于:
- A) 使用一个单独训练的 Reward Model 来打分
- B) 将 reward 隐式表达为策略与参考策略的 log ratio,跳过了 Reward Model 的显式训练
- C) 使用 PPO 算法来优化策略
- D) 对 reward 进行 group 内归一化
点击查看答案
答案:B
DPO 的关键洞察是从 RLHF 的目标函数出发,推导出最优策略满足:
将此关系代入 Bradley-Terry 模型,可以直接用策略网络的 log probability 替代 reward,从而跳过 Reward Model 的训练,将 RLHF 简化为一个分类问题。
题目 3:GRPO 与 PPO 的最大区别是:
- A) GRPO 使用了 clipping 机制
- B) GRPO 不需要 Critic(Value Function),而是用 group 内的 reward 归一化来估计 advantage
- C) GRPO 使用了 KL penalty
- D) GRPO 的学习率更大
点击查看答案
答案:B
GRPO 的核心创新是去掉了 PPO 中的 Critic 网络(价值函数)。传统 PPO 需要一个额外的 Value Function 来估计 advantage,而 GRPO 对同一个 prompt 采样一组回答(group),用 group 内 reward 的均值和标准差来归一化,得到 advantage 估计。这大幅减少了训练开销。
A 和 C 都是 GRPO 和 PPO 共有的特性,不是区别。
练习 2:DPO Loss 实现(Level 2)
基于 DPO 论文的核心公式,实现 DPO loss 计算。输入是策略模型和参考模型对 chosen/rejected 回答的 log probability。
import torch
import torch.nn.functional as F
def get_per_token_logps(logits, labels):
"""从 logits 中提取每个 token 对应 label 的 log probability"""
log_probs = F.log_softmax(logits, dim=-1)
per_token_logps = torch.gather(
log_probs, dim=2, index=labels.unsqueeze(2)
).squeeze(2)
return per_token_logps
def dpo_loss(pi_chosen_logps, pi_rejected_logps,
ref_chosen_logps, ref_rejected_logps,
label_mask, beta=0.1):
"""
计算 DPO loss。
参数:
pi_chosen_logps: 策略模型对 chosen 的 token-level log prob, [bs, seq_len]
pi_rejected_logps: 策略模型对 rejected 的 token-level log prob, [bs, seq_len]
ref_chosen_logps: 参考模型对 chosen 的 token-level log prob, [bs, seq_len]
ref_rejected_logps: 参考模型对 rejected 的 token-level log prob, [bs, seq_len]
label_mask: 标记哪些 token 是回答部分(非 prompt), [bs, seq_len]
beta: 温度系数
返回:
loss: 标量 loss
"""
# TODO 1: 计算策略模型的 log ratio: log(pi(chosen)) - log(pi(rejected))
pi_logratios = _____
# TODO 2: 计算参考模型的 log ratio: log(ref(chosen)) - log(ref(rejected))
ref_logratios = _____
# TODO 3: 计算 DPO 的 logits(策略 log ratio - 参考 log ratio)
logits = _____
# TODO 4: 计算 DPO loss(负的 log sigmoid,只在回答部分计算)
losses = _____
# 对回答部分取平均
loss = losses.sum(-1) / label_mask.sum(-1)
return loss.mean()
# ====== 测试 ======
torch.manual_seed(42)
bs, seq_len = 1, 10
prompt_len = 6
pi_chosen_logps = torch.randn(bs, seq_len) * 0.5
pi_rejected_logps = torch.randn(bs, seq_len) * 0.5
ref_chosen_logps = torch.randn(bs, seq_len) * 0.5
ref_rejected_logps = torch.randn(bs, seq_len) * 0.5
label_mask = torch.tensor([[0, 0, 0, 0, 0, 0, 1, 1, 1, 1]], dtype=torch.float)
loss = dpo_loss(pi_chosen_logps, pi_rejected_logps,
ref_chosen_logps, ref_rejected_logps,
label_mask, beta=0.1)
print(f"DPO Loss: {loss.item():.4f}")提示
pi_logratios就是逐 token 的pi_chosen - pi_rejectedref_logratios同理- DPO logits = pi_logratios - ref_logratios
- loss =
-F.logsigmoid(beta * logits) * label_mask - 使用
F.logsigmoid而非torch.log(torch.sigmoid(...))以保持数值稳定
点击查看答案
# TODO 1
pi_logratios = pi_chosen_logps - pi_rejected_logps
# TODO 2
ref_logratios = ref_chosen_logps - ref_rejected_logps
# TODO 3
logits = pi_logratios - ref_logratios
# TODO 4
losses = -F.logsigmoid(beta * logits) * label_mask解析:
DPO loss 的计算可以分解为四步:
- 策略 log ratio:
,衡量当前策略对 chosen 与 rejected 的偏好程度。 - 参考 log ratio:
,作为基准。 - DPO logits:两个 ratio 的差值,反映的是"策略相对于参考模型,对 chosen 的偏好增强了多少"。
- Sigmoid loss:
,当策略更偏好 chosen 时,logits 为正,loss 趋近 0。
label_mask 的作用是只在回答部分(非 prompt)计算 loss,因为 prompt 是共享的。
练习 3:GRPO Advantage 计算(Level 2)
GRPO 的核心思想是对同一 prompt 采样一组(group)回答,通过 group 内的 reward 归一化来估计 advantage,替代 PPO 中的 Critic 网络。
import torch
def grpo_advantage(rewards):
"""
计算 GRPO 的 group-relative advantage。
参数:
rewards: list 或 1D tensor,一个 group 内每个采样的 reward
例如: [1, 0, 0, 1, 0, 0](6 个采样,2 个正确)
返回:
advantage: tensor,归一化后的 advantage
公式:
A_i = (r_i - mean(r)) / (std(r) + epsilon)
"""
epsilon = 1e-4
rewards = torch.tensor(rewards, dtype=torch.float)
# TODO: 计算 group-relative advantage(减均值,除标准差)
A = _____
return A
# ====== 测试用例 ======
# Case 1: 全部正确 -> advantage 全为 0(无法区分好坏)
A = grpo_advantage([1, 1, 1, 1, 1, 1])
print(f"全对: {A}") # 应接近全 0
# Case 2: 全部错误 -> advantage 全为 0
A = grpo_advantage([0, 0, 0, 0, 0, 0])
print(f"全错: {A}") # 应接近全 0
# Case 3: 只有 1 个正确 -> 正例 advantage 很高
A = grpo_advantage([1, 0, 0, 0, 0, 0])
print(f"1/6 正确: {A}") # 第一个值应远大于其他
# Case 4: 一半正确
A = grpo_advantage([1, 0, 0, 1, 1, 0])
print(f"3/6 正确: {A}") # 正例为正,负例为负
# Case 5: 大组采样(64 个中只有 1 个正确)
rewards_64 = [0] * 64
rewards_64[0] = 1
A = grpo_advantage(rewards_64)
print(f"1/64 正确, advantage[0]: {A[0].item():.2f}") # 应该是很大的正值提示
- 标准的 z-score 归一化:
(x - mean) / (std + eps) rewards.mean()和rewards.std()即可- epsilon 防止标准差为 0 时除零(例如全对或全错的情况)
点击查看答案
A = (rewards - rewards.mean()) / (rewards.std() + epsilon)解析:
GRPO advantage 的计算本质上是一个 z-score 归一化:
- 减均值:使 advantage 有正有负。正例的 advantage 为正(鼓励生成),负例为负(抑制生成)。
- 除标准差:标准化 advantage 的尺度,避免不同 group 的梯度量级差异过大。
- epsilon:当全对或全错时,std=0,加 epsilon 避免除零。此时所有 advantage 接近 0,相当于"跳过"这个 group 的优化。
关键观察:
- 全对/全错时 advantage 全为 0 -- 这说明该 group 无法提供有用的训练信号。
- 只有少数正例时,正例的 advantage 很大 -- 这是合理的,稀有的正确回答应获得更大的奖励。
- 采样数量越多,advantage 估计越准确。论文中典型设置为 group_size=8~64。
练习 4:GRPO Loss 完整实现(Level 3)
实现完整的 GRPO loss,包括 clipped policy ratio、advantage 加权和 KL penalty。
import torch
import torch.nn.functional as F
def approx_kl(log_p, log_q):
"""
计算近似 KL 散度(非对称)。
公式: KL = exp(q/p) - (q - p) - 1
即: q/p - log(q/p) - 1
"""
return log_q.exp() / log_p.exp() \
- (log_q - log_p) - 1
def grpo_loss(pi_logprob, pi_old_logprob, pi_ref_logprob,
advantage, input_len):
"""
计算 GRPO loss。
参数:
pi_logprob: 当前策略的 token log prob, [group_size, seq_len]
pi_old_logprob: 采样时策略的 token log prob, [group_size, seq_len]
pi_ref_logprob: 参考策略的 token log prob, [group_size, seq_len]
advantage: group-relative advantage, [group_size]
input_len: prompt 长度(只在回答部分计算 loss)
返回:
loss: 标量
"""
epsilon = 0.2 # clipping 范围
beta = 0.01 # KL penalty 系数
bs, seq_len = pi_logprob.shape
# TODO 1: 将 advantage 扩展维度以便广播 [group_size] -> [group_size, 1]
advantage = _____
# TODO 2: 计算 importance sampling ratio: exp(log_pi - log_pi_old)
ratio = _____
# TODO 3: clip ratio 到 [1-epsilon, 1+epsilon]
ratio_clip = _____
# TODO 4: 计算 clipped policy gradient(取 min,保守更新)
policy_gradient = _____
# TODO 5: 计算 KL penalty
kl = _____
# 只在回答部分(跳过 prompt)计算 loss
response_loss = (policy_gradient - beta * kl)[:, input_len:]
loss = -response_loss.mean()
return loss
# ====== 测试代码 ======
torch.manual_seed(42)
group_size = 4
seq_len = 8
vocab_size = 50
# 模拟 logits
pi_logits = torch.randn(group_size, seq_len, vocab_size)
pi_old_logits = torch.randn(group_size, seq_len, vocab_size)
pi_ref_logits = torch.randn(group_size, seq_len, vocab_size)
# 模拟 token ids
token_ids = torch.randint(0, vocab_size, (group_size, seq_len))
# 计算 log prob
pi_logprob = torch.gather(
F.log_softmax(pi_logits, dim=-1), dim=-1,
index=token_ids.unsqueeze(-1)
).squeeze(-1)
pi_old_logprob = torch.gather(
F.log_softmax(pi_old_logits, dim=-1), dim=-1,
index=token_ids.unsqueeze(-1)
).squeeze(-1)
pi_ref_logprob = torch.gather(
F.log_softmax(pi_ref_logits, dim=-1), dim=-1,
index=token_ids.unsqueeze(-1)
).squeeze(-1)
# 模拟 advantage
advantage = grpo_advantage([1, 0, 0, 1])
input_len = 3
loss = grpo_loss(pi_logprob, pi_old_logprob, pi_ref_logprob,
advantage, input_len)
print(f"GRPO Loss: {loss.item():.4f}")提示
advantage.unsqueeze(dim=1)将形状从[G]变为[G, 1],便于与[G, seq_len]广播- ratio =
torch.exp(pi_logprob - pi_old_logprob) - clipping:
torch.clamp(ratio, 1 - epsilon, 1 + epsilon) - policy gradient:
torch.minimum(ratio * advantage, ratio_clip * advantage) - KL: 直接调用
approx_kl(pi_logprob, pi_ref_logprob)
点击查看答案
# TODO 1: 扩展 advantage 维度
advantage = advantage.unsqueeze(dim=1) # [G] -> [G, 1]
# TODO 2: importance sampling ratio
ratio = torch.exp(pi_logprob - pi_old_logprob)
# TODO 3: clip ratio
ratio_clip = torch.clamp(ratio, 1 - epsilon, 1 + epsilon)
# TODO 4: clipped policy gradient (PPO-style)
policy_gradient = torch.minimum(ratio * advantage, ratio_clip * advantage)
# TODO 5: KL penalty
kl = approx_kl(pi_logprob, pi_ref_logprob)解析:
GRPO loss 由三个核心组件构成:
Importance Sampling Ratio:
。在 log 空间中计算差值再 exp,避免数值问题。ratio=1 表示策略没有变化。 Clipping:将 ratio 限制在
范围内,防止策略更新过大。取 minimum确保保守更新 -- 当 advantage > 0 时,限制 ratio 不超过;当 advantage < 0 时,限制 ratio 不低于 。 KL Penalty:
。这是一个非对称 KL 散度,惩罚当前策略偏离参考策略过远,防止 reward hacking。
最终 loss =
练习 5:Reward Model 的 forward(Level 3)
实现一个简单的 Reward Model,它在预训练语言模型的基础上加一个 value head,输出标量 reward。
import torch
import torch.nn as nn
import torch.nn.functional as F
class RewardModel(nn.Module):
"""
Reward Model: 在语言模型基础上添加 value head,
对每个序列输出一个标量 reward。
结构:
input -> backbone(LM) -> hidden_states -> value_head -> scalar reward
训练目标:
对于 (chosen, rejected) 对,最大化 r(chosen) - r(rejected) 的概率
"""
def __init__(self, backbone, hidden_dim):
super().__init__()
# TODO 1: 保存 backbone 并冻结其参数
self.backbone = _____
for param in self.backbone.parameters():
_____
# TODO 2: 定义 value head (Linear: hidden_dim -> 1)
self.value_head = _____
def forward(self, input_ids):
"""
参数:
input_ids: [batch_size, seq_len]
返回:
rewards: [batch_size],每个序列的标量 reward
"""
# TODO 3: 获取 backbone 的 hidden states(取最后一层)
with torch.no_grad():
hidden_states = _____ # [bs, seq_len, hidden_dim]
# TODO 4: 取最后一个 token 的 hidden state 作为序列表示
last_hidden = _____ # [bs, hidden_dim]
# TODO 5: 通过 value head 得到标量 reward
reward = _____ # [bs, 1] -> [bs]
return reward
def compute_preference_loss(self, reward_chosen, reward_rejected):
"""
Bradley-Terry 偏好 loss:
loss = -log(sigmoid(r_chosen - r_rejected))
参数:
reward_chosen: [batch_size]
reward_rejected: [batch_size]
返回:
loss: 标量
"""
# TODO 6: 计算 Bradley-Terry loss
loss = _____
return loss
# ====== 测试代码 ======
class SimpleBackbone(nn.Module):
"""模拟一个简单的语言模型 backbone"""
def __init__(self, vocab_size, hidden_dim, seq_len):
super().__init__()
self.embed = nn.Embedding(vocab_size, hidden_dim)
self.linear = nn.Linear(hidden_dim, hidden_dim)
def forward(self, input_ids):
h = self.embed(input_ids)
h = F.gelu(self.linear(h))
return h
torch.manual_seed(42)
vocab_size, hidden_dim, seq_len = 100, 64, 10
backbone = SimpleBackbone(vocab_size, hidden_dim, seq_len)
reward_model = RewardModel(backbone, hidden_dim)
chosen_ids = torch.randint(0, vocab_size, (4, seq_len))
rejected_ids = torch.randint(0, vocab_size, (4, seq_len))
r_chosen = reward_model(chosen_ids)
r_rejected = reward_model(rejected_ids)
print(f"Chosen rewards: {r_chosen}")
print(f"Rejected rewards: {r_rejected}")
loss = reward_model.compute_preference_loss(r_chosen, r_rejected)
print(f"Preference loss: {loss.item():.4f}")
# 验证 value_head 可训练,backbone 冻结
trainable = sum(p.numel() for p in reward_model.parameters() if p.requires_grad)
total = sum(p.numel() for p in reward_model.parameters())
print(f"可训练参数: {trainable}, 总参数: {total}")提示
- backbone 赋值后用
param.requires_grad_(False)冻结 - value_head:
nn.Linear(hidden_dim, 1) - hidden_states:
self.backbone(input_ids) - 最后一个 token:
hidden_states[:, -1, :] - reward:
self.value_head(last_hidden).squeeze(-1)去掉最后一维 - preference loss:
-F.logsigmoid(reward_chosen - reward_rejected).mean()
点击查看答案
class RewardModel(nn.Module):
def __init__(self, backbone, hidden_dim):
super().__init__()
# TODO 1
self.backbone = backbone
for param in self.backbone.parameters():
param.requires_grad_(False)
# TODO 2
self.value_head = nn.Linear(hidden_dim, 1)
def forward(self, input_ids):
# TODO 3
with torch.no_grad():
hidden_states = self.backbone(input_ids)
# TODO 4
last_hidden = hidden_states[:, -1, :]
# TODO 5
reward = self.value_head(last_hidden).squeeze(-1)
return reward
def compute_preference_loss(self, reward_chosen, reward_rejected):
# TODO 6
loss = -F.logsigmoid(reward_chosen - reward_rejected).mean()
return loss解析:
Reward Model 的设计要点:
- Backbone 冻结:Reward Model 通常基于预训练 LM 初始化,训练时只更新 value head,保留语言理解能力。
- 序列表示:取最后一个 token 的 hidden state 作为整个序列的表示。在 causal LM 中,最后一个 token 经过了所有上文的注意力聚合,信息最丰富。
- Value Head:一个简单的 Linear 层,将 hidden_dim 维映射到标量 reward。
- Bradley-Terry Loss:
,直接优化"chosen 的 reward 高于 rejected"的概率。这与 DPO 中的隐式 reward 对应: 。
在实际训练中,Reward Model 的质量直接决定了 RLHF 的上限。常见问题包括 reward hacking(策略找到 RM 的漏洞而非真正改善质量)和 overoptimization。DPO 绕过了显式 RM 的训练,但 GRPO 仍然需要 reward function(可以是规则或 RM)。
MLM 代码训练模式
完成上面的固定填空后,试试随机挖空模式 -- 每次点击「刷新」会随机遮盖不同的代码片段,帮你彻底记住每一行。
DPO Loss 计算
def dpo_loss(pi_chosen_logps, pi_rejected_logps,
ref_chosen_logps, ref_rejected_logps,
label_mask, beta=0.1):
# çç¥æ¨¡å log ratio
pi_logratios = pi_chosen_logps - pi_rejected_logps
# åèæ¨¡å log ratio
ref_logratios = ref_chosen_logps - ref_rejected_logps
# DPO logits = çç¥ log ratio - åè log ratio
logits = pi_logratios - ref_logratios
# DPO loss: è´ç log sigmoidï¼åªå¨åçé¨å计ç®
losses = -F.logsigmoid(beta * logits) * label_mask
loss = losses.sum(-1) / label_mask.sum(-1)
return loss.mean()GRPO Advantage 计算
def grpo_advantage(rewards):
epsilon = 1e-4
rewards = torch.tensor(rewards, dtype=torch.float)
# group-relative advantage: z-score å½ä¸å
A = (rewards - rewards.mean()) / (rewards.std() + epsilon)
return AGRPO Policy Loss(含 KL Penalty)
def grpo_loss(pi_logprob, pi_old_logprob, pi_ref_logprob,
advantage, input_len):
epsilon = 0.2
beta = 0.01
# æ©å± advantage 维度以便广æ
advantage = advantage.unsqueeze(dim=1)
# importance sampling ratio
ratio = torch.exp(pi_logprob - pi_old_logprob)
# clip ratio å° [1-epsilon, 1+epsilon]
ratio_clip = torch.clamp(ratio, 1 - epsilon, 1 + epsilon)
# clipped policy gradientï¼å minï¼ä¿å®æ´æ°ï¼
policy_gradient = torch.minimum(ratio * advantage, ratio_clip * advantage)
# KL penalty
kl = approx_kl(pi_logprob, pi_ref_logprob)
# åªå¨åçé¨åè®¡ç® loss
response_loss = (policy_gradient - beta * kl)[:, input_len:]
loss = -response_loss.mean()
return loss