Skip to content

DPO/GRPO 代码填空 (Level 2-3)

本练习覆盖偏好对齐训练的两大核心算法:DPO(Direct Preference Optimization)和 GRPO(Group Relative Policy Optimization)。从概念理解到完整实现,掌握 RLHF 的关键技术。

前置知识

  • 强化学习基础(策略、奖励、KL 散度)
  • Bradley-Terry 偏好模型
  • PyTorch 基础(log_softmax、gather、sigmoid)

核心公式

DPO Loss:

LDPO=E[logσ(βlogπθ(yw|x)πref(yw|x)βlogπθ(yl|x)πref(yl|x))]

GRPO Loss:

LGRPO=E[min(πθπoldA,clip(πθπold,1ϵ,1+ϵ)A)βDKL]

练习 1:Bradley-Terry 模型概念题(Level 1)

题目 1:在 Bradley-Terry 偏好模型中,给定两个回答 yw(优选)和 yl(劣选),人类偏好 yw 的概率建模为:

P(ywyl)=σ(r(yw)r(yl))

其中 σ 是 sigmoid 函数。如果 r(yw)=2.0r(yl)=1.0,那么 P(ywyl) 最接近:

  • A) 0.50
  • B) 0.73
  • C) 0.95
  • D) 0.99
点击查看答案

答案:C

σ(2.0(1.0))=σ(3.0)=11+e30.953

Bradley-Terry 模型的核心思想是:两个选项被偏好的概率由它们 reward 之差的 sigmoid 决定。差距越大,偏好越确定。


题目 2:DPO 的核心创新在于:

  • A) 使用一个单独训练的 Reward Model 来打分
  • B) 将 reward 隐式表达为策略与参考策略的 log ratio,跳过了 Reward Model 的显式训练
  • C) 使用 PPO 算法来优化策略
  • D) 对 reward 进行 group 内归一化
点击查看答案

答案:B

DPO 的关键洞察是从 RLHF 的目标函数出发,推导出最优策略满足:

r(x,y)=βlogπ(y|x)πref(y|x)+βlogZ(x)

将此关系代入 Bradley-Terry 模型,可以直接用策略网络的 log probability 替代 reward,从而跳过 Reward Model 的训练,将 RLHF 简化为一个分类问题。


题目 3:GRPO 与 PPO 的最大区别是:

  • A) GRPO 使用了 clipping 机制
  • B) GRPO 不需要 Critic(Value Function),而是用 group 内的 reward 归一化来估计 advantage
  • C) GRPO 使用了 KL penalty
  • D) GRPO 的学习率更大
点击查看答案

答案:B

GRPO 的核心创新是去掉了 PPO 中的 Critic 网络(价值函数)。传统 PPO 需要一个额外的 Value Function 来估计 advantage,而 GRPO 对同一个 prompt 采样一组回答(group),用 group 内 reward 的均值和标准差来归一化,得到 advantage 估计。这大幅减少了训练开销。

A 和 C 都是 GRPO 和 PPO 共有的特性,不是区别。


练习 2:DPO Loss 实现(Level 2)

基于 DPO 论文的核心公式,实现 DPO loss 计算。输入是策略模型和参考模型对 chosen/rejected 回答的 log probability。

python
import torch
import torch.nn.functional as F

def get_per_token_logps(logits, labels):
    """从 logits 中提取每个 token 对应 label 的 log probability"""
    log_probs = F.log_softmax(logits, dim=-1)
    per_token_logps = torch.gather(
        log_probs, dim=2, index=labels.unsqueeze(2)
    ).squeeze(2)
    return per_token_logps


def dpo_loss(pi_chosen_logps, pi_rejected_logps,
             ref_chosen_logps, ref_rejected_logps,
             label_mask, beta=0.1):
    """
    计算 DPO loss。
    
    参数:
        pi_chosen_logps:    策略模型对 chosen 的 token-level log prob, [bs, seq_len]
        pi_rejected_logps:  策略模型对 rejected 的 token-level log prob, [bs, seq_len]
        ref_chosen_logps:   参考模型对 chosen 的 token-level log prob, [bs, seq_len]
        ref_rejected_logps: 参考模型对 rejected 的 token-level log prob, [bs, seq_len]
        label_mask:         标记哪些 token 是回答部分(非 prompt), [bs, seq_len]
        beta:               温度系数
    
    返回:
        loss: 标量 loss
    """
    # TODO 1: 计算策略模型的 log ratio: log(pi(chosen)) - log(pi(rejected))
    pi_logratios = _____

    # TODO 2: 计算参考模型的 log ratio: log(ref(chosen)) - log(ref(rejected))
    ref_logratios = _____

    # TODO 3: 计算 DPO 的 logits(策略 log ratio - 参考 log ratio)
    logits = _____

    # TODO 4: 计算 DPO loss(负的 log sigmoid,只在回答部分计算)
    losses = _____

    # 对回答部分取平均
    loss = losses.sum(-1) / label_mask.sum(-1)
    return loss.mean()


# ====== 测试 ======
torch.manual_seed(42)
bs, seq_len = 1, 10
prompt_len = 6

pi_chosen_logps = torch.randn(bs, seq_len) * 0.5
pi_rejected_logps = torch.randn(bs, seq_len) * 0.5
ref_chosen_logps = torch.randn(bs, seq_len) * 0.5
ref_rejected_logps = torch.randn(bs, seq_len) * 0.5
label_mask = torch.tensor([[0, 0, 0, 0, 0, 0, 1, 1, 1, 1]], dtype=torch.float)

loss = dpo_loss(pi_chosen_logps, pi_rejected_logps,
                ref_chosen_logps, ref_rejected_logps,
                label_mask, beta=0.1)
print(f"DPO Loss: {loss.item():.4f}")
提示
  • pi_logratios 就是逐 token 的 pi_chosen - pi_rejected
  • ref_logratios 同理
  • DPO logits = pi_logratios - ref_logratios
  • loss = -F.logsigmoid(beta * logits) * label_mask
  • 使用 F.logsigmoid 而非 torch.log(torch.sigmoid(...)) 以保持数值稳定
点击查看答案
python
# TODO 1
pi_logratios = pi_chosen_logps - pi_rejected_logps

# TODO 2
ref_logratios = ref_chosen_logps - ref_rejected_logps

# TODO 3
logits = pi_logratios - ref_logratios

# TODO 4
losses = -F.logsigmoid(beta * logits) * label_mask

解析:

DPO loss 的计算可以分解为四步:

  1. 策略 log ratiologπθ(yw|x)logπθ(yl|x),衡量当前策略对 chosen 与 rejected 的偏好程度。
  2. 参考 log ratiologπref(yw|x)logπref(yl|x),作为基准。
  3. DPO logits:两个 ratio 的差值,反映的是"策略相对于参考模型,对 chosen 的偏好增强了多少"。
  4. Sigmoid losslogσ(βlogits),当策略更偏好 chosen 时,logits 为正,loss 趋近 0。

label_mask 的作用是只在回答部分(非 prompt)计算 loss,因为 prompt 是共享的。


练习 3:GRPO Advantage 计算(Level 2)

GRPO 的核心思想是对同一 prompt 采样一组(group)回答,通过 group 内的 reward 归一化来估计 advantage,替代 PPO 中的 Critic 网络。

python
import torch

def grpo_advantage(rewards):
    """
    计算 GRPO 的 group-relative advantage。
    
    参数:
        rewards: list 或 1D tensor,一个 group 内每个采样的 reward
                 例如: [1, 0, 0, 1, 0, 0](6 个采样,2 个正确)
    
    返回:
        advantage: tensor,归一化后的 advantage
    
    公式:
        A_i = (r_i - mean(r)) / (std(r) + epsilon)
    """
    epsilon = 1e-4
    rewards = torch.tensor(rewards, dtype=torch.float)
    
    # TODO: 计算 group-relative advantage(减均值,除标准差)
    A = _____
    return A


# ====== 测试用例 ======
# Case 1: 全部正确 -> advantage 全为 0(无法区分好坏)
A = grpo_advantage([1, 1, 1, 1, 1, 1])
print(f"全对: {A}")  # 应接近全 0

# Case 2: 全部错误 -> advantage 全为 0
A = grpo_advantage([0, 0, 0, 0, 0, 0])
print(f"全错: {A}")  # 应接近全 0

# Case 3: 只有 1 个正确 -> 正例 advantage 很高
A = grpo_advantage([1, 0, 0, 0, 0, 0])
print(f"1/6 正确: {A}")  # 第一个值应远大于其他

# Case 4: 一半正确
A = grpo_advantage([1, 0, 0, 1, 1, 0])
print(f"3/6 正确: {A}")  # 正例为正,负例为负

# Case 5: 大组采样(64 个中只有 1 个正确)
rewards_64 = [0] * 64
rewards_64[0] = 1
A = grpo_advantage(rewards_64)
print(f"1/64 正确, advantage[0]: {A[0].item():.2f}")  # 应该是很大的正值
提示
  • 标准的 z-score 归一化: (x - mean) / (std + eps)
  • rewards.mean()rewards.std() 即可
  • epsilon 防止标准差为 0 时除零(例如全对或全错的情况)
点击查看答案
python
A = (rewards - rewards.mean()) / (rewards.std() + epsilon)

解析:

GRPO advantage 的计算本质上是一个 z-score 归一化:

  1. 减均值:使 advantage 有正有负。正例的 advantage 为正(鼓励生成),负例为负(抑制生成)。
  2. 除标准差:标准化 advantage 的尺度,避免不同 group 的梯度量级差异过大。
  3. epsilon:当全对或全错时,std=0,加 epsilon 避免除零。此时所有 advantage 接近 0,相当于"跳过"这个 group 的优化。

关键观察:

  • 全对/全错时 advantage 全为 0 -- 这说明该 group 无法提供有用的训练信号。
  • 只有少数正例时,正例的 advantage 很大 -- 这是合理的,稀有的正确回答应获得更大的奖励。
  • 采样数量越多,advantage 估计越准确。论文中典型设置为 group_size=8~64。

练习 4:GRPO Loss 完整实现(Level 3)

实现完整的 GRPO loss,包括 clipped policy ratio、advantage 加权和 KL penalty。

python
import torch
import torch.nn.functional as F

def approx_kl(log_p, log_q):
    """
    计算近似 KL 散度(非对称)。
    
    公式: KL = exp(q/p) - (q - p) - 1
    即: q/p - log(q/p) - 1
    """
    return log_q.exp() / log_p.exp() \
           - (log_q - log_p) - 1


def grpo_loss(pi_logprob, pi_old_logprob, pi_ref_logprob,
              advantage, input_len):
    """
    计算 GRPO loss。
    
    参数:
        pi_logprob:     当前策略的 token log prob, [group_size, seq_len]
        pi_old_logprob: 采样时策略的 token log prob, [group_size, seq_len]
        pi_ref_logprob: 参考策略的 token log prob, [group_size, seq_len]
        advantage:      group-relative advantage, [group_size]
        input_len:      prompt 长度(只在回答部分计算 loss)
    
    返回:
        loss: 标量
    """
    epsilon = 0.2  # clipping 范围
    beta = 0.01    # KL penalty 系数

    bs, seq_len = pi_logprob.shape

    # TODO 1: 将 advantage 扩展维度以便广播 [group_size] -> [group_size, 1]
    advantage = _____

    # TODO 2: 计算 importance sampling ratio: exp(log_pi - log_pi_old)
    ratio = _____

    # TODO 3: clip ratio 到 [1-epsilon, 1+epsilon]
    ratio_clip = _____

    # TODO 4: 计算 clipped policy gradient(取 min,保守更新)
    policy_gradient = _____

    # TODO 5: 计算 KL penalty
    kl = _____

    # 只在回答部分(跳过 prompt)计算 loss
    response_loss = (policy_gradient - beta * kl)[:, input_len:]
    loss = -response_loss.mean()
    return loss


# ====== 测试代码 ======
torch.manual_seed(42)
group_size = 4
seq_len = 8
vocab_size = 50

# 模拟 logits
pi_logits = torch.randn(group_size, seq_len, vocab_size)
pi_old_logits = torch.randn(group_size, seq_len, vocab_size)
pi_ref_logits = torch.randn(group_size, seq_len, vocab_size)

# 模拟 token ids
token_ids = torch.randint(0, vocab_size, (group_size, seq_len))

# 计算 log prob
pi_logprob = torch.gather(
    F.log_softmax(pi_logits, dim=-1), dim=-1,
    index=token_ids.unsqueeze(-1)
).squeeze(-1)

pi_old_logprob = torch.gather(
    F.log_softmax(pi_old_logits, dim=-1), dim=-1,
    index=token_ids.unsqueeze(-1)
).squeeze(-1)

pi_ref_logprob = torch.gather(
    F.log_softmax(pi_ref_logits, dim=-1), dim=-1,
    index=token_ids.unsqueeze(-1)
).squeeze(-1)

# 模拟 advantage
advantage = grpo_advantage([1, 0, 0, 1])
input_len = 3

loss = grpo_loss(pi_logprob, pi_old_logprob, pi_ref_logprob,
                 advantage, input_len)
print(f"GRPO Loss: {loss.item():.4f}")
提示
  • advantage.unsqueeze(dim=1) 将形状从 [G] 变为 [G, 1],便于与 [G, seq_len] 广播
  • ratio = torch.exp(pi_logprob - pi_old_logprob)
  • clipping: torch.clamp(ratio, 1 - epsilon, 1 + epsilon)
  • policy gradient: torch.minimum(ratio * advantage, ratio_clip * advantage)
  • KL: 直接调用 approx_kl(pi_logprob, pi_ref_logprob)
点击查看答案
python
# TODO 1: 扩展 advantage 维度
advantage = advantage.unsqueeze(dim=1)  # [G] -> [G, 1]

# TODO 2: importance sampling ratio
ratio = torch.exp(pi_logprob - pi_old_logprob)

# TODO 3: clip ratio
ratio_clip = torch.clamp(ratio, 1 - epsilon, 1 + epsilon)

# TODO 4: clipped policy gradient (PPO-style)
policy_gradient = torch.minimum(ratio * advantage, ratio_clip * advantage)

# TODO 5: KL penalty
kl = approx_kl(pi_logprob, pi_ref_logprob)

解析:

GRPO loss 由三个核心组件构成:

  1. Importance Sampling Ratioπθ(y|x)πold(y|x)。在 log 空间中计算差值再 exp,避免数值问题。ratio=1 表示策略没有变化。

  2. Clipping:将 ratio 限制在 [1ϵ,1+ϵ] 范围内,防止策略更新过大。取 minimum 确保保守更新 -- 当 advantage > 0 时,限制 ratio 不超过 1+ϵ;当 advantage < 0 时,限制 ratio 不低于 1ϵ

  3. KL PenaltyDKL(πref||πθ)=πrefπθlogπrefπθ1。这是一个非对称 KL 散度,惩罚当前策略偏离参考策略过远,防止 reward hacking。

最终 loss = (policy_gradientβKL),取负号是因为我们要最大化 policy gradient,最小化 KL。


练习 5:Reward Model 的 forward(Level 3)

实现一个简单的 Reward Model,它在预训练语言模型的基础上加一个 value head,输出标量 reward。

python
import torch
import torch.nn as nn
import torch.nn.functional as F

class RewardModel(nn.Module):
    """
    Reward Model: 在语言模型基础上添加 value head,
    对每个序列输出一个标量 reward。
    
    结构:
        input -> backbone(LM) -> hidden_states -> value_head -> scalar reward
    
    训练目标:
        对于 (chosen, rejected) 对,最大化 r(chosen) - r(rejected) 的概率
    """
    def __init__(self, backbone, hidden_dim):
        super().__init__()
        # TODO 1: 保存 backbone 并冻结其参数
        self.backbone = _____
        for param in self.backbone.parameters():
            _____

        # TODO 2: 定义 value head (Linear: hidden_dim -> 1)
        self.value_head = _____

    def forward(self, input_ids):
        """
        参数:
            input_ids: [batch_size, seq_len]
        返回:
            rewards: [batch_size],每个序列的标量 reward
        """
        # TODO 3: 获取 backbone 的 hidden states(取最后一层)
        with torch.no_grad():
            hidden_states = _____  # [bs, seq_len, hidden_dim]

        # TODO 4: 取最后一个 token 的 hidden state 作为序列表示
        last_hidden = _____  # [bs, hidden_dim]

        # TODO 5: 通过 value head 得到标量 reward
        reward = _____  # [bs, 1] -> [bs]
        return reward

    def compute_preference_loss(self, reward_chosen, reward_rejected):
        """
        Bradley-Terry 偏好 loss:
            loss = -log(sigmoid(r_chosen - r_rejected))
        
        参数:
            reward_chosen:   [batch_size]
            reward_rejected: [batch_size]
        返回:
            loss: 标量
        """
        # TODO 6: 计算 Bradley-Terry loss
        loss = _____
        return loss


# ====== 测试代码 ======
class SimpleBackbone(nn.Module):
    """模拟一个简单的语言模型 backbone"""
    def __init__(self, vocab_size, hidden_dim, seq_len):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, hidden_dim)
        self.linear = nn.Linear(hidden_dim, hidden_dim)

    def forward(self, input_ids):
        h = self.embed(input_ids)
        h = F.gelu(self.linear(h))
        return h

torch.manual_seed(42)
vocab_size, hidden_dim, seq_len = 100, 64, 10

backbone = SimpleBackbone(vocab_size, hidden_dim, seq_len)
reward_model = RewardModel(backbone, hidden_dim)

chosen_ids = torch.randint(0, vocab_size, (4, seq_len))
rejected_ids = torch.randint(0, vocab_size, (4, seq_len))

r_chosen = reward_model(chosen_ids)
r_rejected = reward_model(rejected_ids)

print(f"Chosen rewards: {r_chosen}")
print(f"Rejected rewards: {r_rejected}")

loss = reward_model.compute_preference_loss(r_chosen, r_rejected)
print(f"Preference loss: {loss.item():.4f}")

# 验证 value_head 可训练,backbone 冻结
trainable = sum(p.numel() for p in reward_model.parameters() if p.requires_grad)
total = sum(p.numel() for p in reward_model.parameters())
print(f"可训练参数: {trainable}, 总参数: {total}")
提示
  • backbone 赋值后用 param.requires_grad_(False) 冻结
  • value_head: nn.Linear(hidden_dim, 1)
  • hidden_states: self.backbone(input_ids)
  • 最后一个 token: hidden_states[:, -1, :]
  • reward: self.value_head(last_hidden).squeeze(-1) 去掉最后一维
  • preference loss: -F.logsigmoid(reward_chosen - reward_rejected).mean()
点击查看答案
python
class RewardModel(nn.Module):
    def __init__(self, backbone, hidden_dim):
        super().__init__()
        # TODO 1
        self.backbone = backbone
        for param in self.backbone.parameters():
            param.requires_grad_(False)

        # TODO 2
        self.value_head = nn.Linear(hidden_dim, 1)

    def forward(self, input_ids):
        # TODO 3
        with torch.no_grad():
            hidden_states = self.backbone(input_ids)

        # TODO 4
        last_hidden = hidden_states[:, -1, :]

        # TODO 5
        reward = self.value_head(last_hidden).squeeze(-1)
        return reward

    def compute_preference_loss(self, reward_chosen, reward_rejected):
        # TODO 6
        loss = -F.logsigmoid(reward_chosen - reward_rejected).mean()
        return loss

解析:

Reward Model 的设计要点:

  1. Backbone 冻结:Reward Model 通常基于预训练 LM 初始化,训练时只更新 value head,保留语言理解能力。
  2. 序列表示:取最后一个 token 的 hidden state 作为整个序列的表示。在 causal LM 中,最后一个 token 经过了所有上文的注意力聚合,信息最丰富。
  3. Value Head:一个简单的 Linear 层,将 hidden_dim 维映射到标量 reward。
  4. Bradley-Terry Losslogσ(rwrl),直接优化"chosen 的 reward 高于 rejected"的概率。这与 DPO 中的隐式 reward 对应:r(y|x)=βlogπ(y|x)πref(y|x)

在实际训练中,Reward Model 的质量直接决定了 RLHF 的上限。常见问题包括 reward hacking(策略找到 RM 的漏洞而非真正改善质量)和 overoptimization。DPO 绕过了显式 RM 的训练,但 GRPO 仍然需要 reward function(可以是规则或 RM)。


MLM 代码训练模式

完成上面的固定填空后,试试随机挖空模式 -- 每次点击「刷新」会随机遮盖不同的代码片段,帮你彻底记住每一行。

DPO Loss 计算

DPO Loss:log ratio 与 sigmoid loss
共 45 个可挖空位 | 已挖 0 个
def dpo_loss(pi_chosen_logps, pi_rejected_logps,
             ref_chosen_logps, ref_rejected_logps,
             label_mask, beta=0.1):
    # 策略模型 log ratio
    pi_logratios = pi_chosen_logps - pi_rejected_logps

    # 参考模型 log ratio
    ref_logratios = ref_chosen_logps - ref_rejected_logps

    # DPO logits = 策略 log ratio - 参考 log ratio
    logits = pi_logratios - ref_logratios

    # DPO loss: 负的 log sigmoid,只在回答部分计算
    losses = -F.logsigmoid(beta * logits) * label_mask

    loss = losses.sum(-1) / label_mask.sum(-1)
    return loss.mean()

GRPO Advantage 计算

GRPO:Group Relative Advantage(z-score 归一化)
共 22 个可挖空位 | 已挖 0 个
def grpo_advantage(rewards):
    epsilon = 1e-4
    rewards = torch.tensor(rewards, dtype=torch.float)
    # group-relative advantage: z-score 归一化
    A = (rewards - rewards.mean()) / (rewards.std() + epsilon)
    return A

GRPO Policy Loss(含 KL Penalty)

GRPO Loss:clipped ratio + advantage + KL penalty
共 62 个可挖空位 | 已挖 0 个
def grpo_loss(pi_logprob, pi_old_logprob, pi_ref_logprob,
              advantage, input_len):
    epsilon = 0.2
    beta = 0.01

    # 扩展 advantage 维度以便广播
    advantage = advantage.unsqueeze(dim=1)

    # importance sampling ratio
    ratio = torch.exp(pi_logprob - pi_old_logprob)

    # clip ratio 到 [1-epsilon, 1+epsilon]
    ratio_clip = torch.clamp(ratio, 1 - epsilon, 1 + epsilon)

    # clipped policy gradient(取 min,保守更新)
    policy_gradient = torch.minimum(ratio * advantage, ratio_clip * advantage)

    # KL penalty
    kl = approx_kl(pi_logprob, pi_ref_logprob)

    # 只在回答部分计算 loss
    response_loss = (policy_gradient - beta * kl)[:, input_len:]
    loss = -response_loss.mean()
    return loss