Skip to content

推理优化

一句话总结

推理优化让大模型从实验室走向生产环境——通过 KV Cache、PagedAttention、Continuous Batching、Chunked Prefill、投机采样、PD 分离等技术,将推理延迟从秒级降到毫秒级,将吞吐量提升数十倍;端侧推理引擎更将 LLM 能力延伸到手机和边缘设备。

在大模型体系中的位置

推理优化处于大模型工程化的最后一公里:

预训练 → SFT/RLHF → 量化/蒸馏 → 【推理优化】 → 上线服务

                               你在这里

无论模型训练得多好,如果推理效率低下,用户就会面临高延迟、高成本的困境。推理优化的核心目标是:

  • 降低延迟(Latency):用户等待第一个 token 的时间(TTFT)和每个 token 的生成时间(TPOT)
  • 提升吞吐(Throughput):单位时间内处理的 token 数量
  • 降低成本(Cost):最大化 GPU 利用率,减少所需 GPU 数量

推理优化涉及的技术层次:

层次技术目标
算法层KV Cache、投机采样减少冗余计算
内存层PagedAttention、KV Cache 管理提升显存利用率
调度层Continuous Batching、Chunked Prefill提升 GPU 利用率
架构层PD 分离、vLLM系统级优化
内核层FlashAttention、融合算子底层计算加速

LLM 推理的两个阶段

LLM 的自回归推理天然分为两个性质截然不同的阶段:

Prefill 阶段(计算密集)

Prefill 阶段处理用户输入的完整 prompt,一次性计算所有输入 token 的 KV Cache。

特征

  • 输入:完整 prompt(可能有数千个 token)
  • 计算模式:大矩阵乘法,可以高度并行
  • 瓶颈:计算密集(Compute-bound),GPU 算力是限制因素
  • 类比:一次性读完一篇文章

Decode 阶段(访存密集)

Decode 阶段逐个生成输出 token,每一步都需要读取全部模型参数和之前所有 token 的 KV Cache。

特征

  • 输入:单个 token(上一步的输出)
  • 计算模式:向量-矩阵乘法,计算量小但内存读取量大
  • 瓶颈:访存密集(Memory-bound),显存带宽是限制因素
  • 类比:一个字一个字地写回复,每写一个字都要回头看全部内容

用 Roofline Model 分析为什么 Decode 是 Memory-Bound

Roofline Model 的核心指标是算术强度(Arithmetic Intensity)

算术强度=FLOPs(计算量)Bytes(访存量)

Llama 70B 为例,分析 Decode 阶段生成单个 token 的情况:

计算量估算

  • 模型参数量:70B(700 亿参数)
  • 每个参数参与 2 次浮点运算(一次乘法 + 一次加法)
  • FLOPs = 2 x 70B = 140 GFLOPs

访存量估算

  • FP16 精度下,每个参数占 2 Bytes
  • 需要读取全部模型参数:70B x 2 = 140 GB
  • 还需要读取 KV Cache(随序列长度增长)

算术强度

算术强度=140 GFLOPs140 GB=1 FLOP/Byte

而 A100 GPU 的计算/带宽比值:

312 TFLOPS2 TB/s=156 FLOPs/Byte

结论:Decode 阶段的算术强度(1)远低于 GPU 的计算带宽比(156),这意味着 GPU 的计算单元大部分时间在等待数据从显存搬运过来,算力严重浪费。这就是为什么 Decode 阶段是 Memory-bound 的。


KV Cache

为什么需要 KV Cache

在自回归生成中,每个新 token 需要和之前所有 token 做 Attention 计算。如果每次都重新计算所有 token 的 K、V,就会产生大量冗余计算:

python
# 不使用 KV Cache(每次重新计算)—— O(n^2) 总计算量
for i in range(max_tokens):
    logits = model(all_tokens[:i+1])  # 每次传入全部 token,重复计算前面的 K、V
    next_token = sample(logits)

# 使用 KV Cache(增量计算)—— O(n) 总计算量
kv_cache = None
for i in range(max_tokens):
    logits, kv_cache = model(tokens[i:i+1], past_kv=kv_cache)  # 只传入新 token
    next_token = sample(logits)

没有 KV Cache 时,生成第 n 个 token 需要对前面所有 token 重新计算 K、V,生成 N 个 token 的总计算量与 N2 成正比。有了 KV Cache,每步只需计算当前 token 的 Q、K、V,然后拼接到缓存中,总计算量与 N 成线性关系。

KV Cache 的显存计算

KV Cache 的显存公式:

KV Cache 显存=2×L×H×dh×S×B×dtype_size

其中:

符号含义Llama 70B 取值
2K 和 V 两个矩阵2
LTransformer 层数80
H注意力头数(KV头数,GQA)8(GQA)
dh每个头的维度128
S序列长度4096
BBatch Size1
dtype_size数据类型字节数(FP16=2)2

Llama 70B 在 4K context、batch_size=1 下的 KV Cache

2×80×8×128×4096×1×2=1,073,741,824 Bytes1 GB

如果使用完整的 64 个注意力头(非 GQA):

2×80×64×128×4096×1×28 GB

当 batch_size 增大到 32 时,KV Cache 直接膨胀到 ~32 GB,和模型参数本身(140 GB FP16)相比已经非常可观。这就是为什么 KV Cache 的管理如此重要。


PagedAttention

传统 KV Cache 的内存碎片问题

传统方式为每个请求预分配一块连续的最大长度的 KV Cache 空间。这带来两个问题:

  1. 内部碎片(Internal Fragmentation):请求实际生成长度通常远小于预分配的最大长度,大量显存被浪费
  2. 外部碎片(External Fragmentation):不同请求占用的连续空间大小不一,释放后留下的空隙无法被其他请求利用

实测表明,传统方案的 KV Cache 显存利用率仅为 20-40%

虚拟内存的启发

vLLM 团队从操作系统的虚拟内存管理中获得灵感:

操作系统概念PagedAttention 对应
虚拟页逻辑块(Logical Block)
物理页帧物理块(Physical Block)
页表Block Table
按需分页动态分配 KV Cache Block
Copy-on-WriteBeam Search 共享前缀

物理块与逻辑块

核心思想:将 KV Cache 切分为固定大小的 Block(如每个 Block 存储 16 个 token 的 KV),通过 Block Table 维护逻辑到物理的映射。

逻辑视图(请求看到的连续空间):
Request 1: [Block 0] [Block 1] [Block 2]
Request 2: [Block 0] [Block 1]

物理存储(实际的 GPU 显存):
[Page 5] [Page 2] [Page 7] [Page 1] [Page 3] ... [Page N]

Block Table(映射关系):
Request 1: 逻辑 0→物理 5, 逻辑 1→物理 7, 逻辑 2→物理 2
Request 2: 逻辑 0→物理 1, 逻辑 1→物理 3

页分配器的实现:

python
from collections import deque

class PageAllocator:
    """物理页帧的分配与回收,维护空闲池和链式索引"""

    def __init__(self, page_size: int, total_pages: int):
        self.page_size = page_size
        self.total_pages = total_pages
        self.free_pool = deque(range(total_pages))  # 用 deque 实现 O(1) 双端操作
        self.in_use = set()
        self.slot_offset = [0] * total_pages   # 每页已填充的 token 数
        self.successor = [-1] * total_pages    # 页链表:当前页 → 下一页

    def acquire(self, count: int, prev_page: int = -1) -> list[int]:
        """从空闲池取出 count 个页帧,可选地链接到 prev_page 之后"""
        if len(self.free_pool) < count:
            return []
        grabbed = [self.free_pool.popleft() for _ in range(count)]
        self.in_use.update(grabbed)
        for pid in grabbed:
            self.slot_offset[pid] = 0
            self.successor[pid] = -1
        if prev_page >= 0:
            self.successor[prev_page] = grabbed[0]
        return grabbed

    def release(self, page_ids: list[int]):
        """将页帧归还空闲池,重置元数据"""
        for pid in page_ids:
            if pid in self.in_use:
                self.in_use.discard(pid)
                self.free_pool.append(pid)
                self.slot_offset[pid] = 0
                self.successor[pid] = -1

PagedAttention 内核实现

PagedAttention 的关键在于:KV Cache 在物理上是非连续的,但 Attention 计算需要正确地跨 Block 聚合结果

传统 Attention 的输入形状是 [batch_size, seq_len, num_heads, head_dim],而 PagedAttention 的 KV Cache 输入是 [num_pages, page_size, num_heads, head_dim]——按页组织。

Prefill 阶段的 PageAttention

多个请求的 Block 混合排列,通过 request_num_pages 记录每个请求占用的页数:

python
class PageAttentionDecoderBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.num_heads = config.num_heads
        self.dim = config.dim
        self.head_dim = config.head_dim
        self.WQ = nn.Linear(config.dim, config.dim, bias=False)
        self.WK = nn.Linear(config.dim, config.dim, bias=False)
        self.WV = nn.Linear(config.dim, config.dim, bias=False)
        self.WO = nn.Linear(config.dim, config.dim, bias=False)

    def forward_prefill(self, X, request_num_pages, attention_backend):
        """
        Request-Wise PageAttention Prefill
        X: [num_pages, page_size, dim]  -- 注意 dim=0 不是 batch_size
        request_num_pages: list[int], 如 [3, 2] 表示 2 个请求分别占 3 和 2 页
        """
        B, T, _ = X.shape
        H, D = self.num_heads, self.head_dim

        Q, K, V = self.WQ(X), self.WK(X), self.WV(X)
        Q = Q.reshape(B, T, H, D).transpose(1, 2)
        K = K.reshape(B, T, H, D).transpose(1, 2)
        V = V.reshape(B, T, H, D).transpose(1, 2)
        O = torch.zeros_like(Q)

        # 按请求分别计算 Attention
        offset = 0
        for t, N in enumerate(request_num_pages):
            Q_ = Q[offset: offset + N]
            K_ = K[offset: offset + N]
            V_ = V[offset: offset + N]
            O[offset: offset + N] = attention_backend(Q_, K_, V_)
            offset += N

        O = O.transpose(1, 2).reshape(B, T, H * D)
        O = self.WO(O)
        return O, [K.transpose(1, 2), V.transpose(1, 2)]

Decode 阶段的 PageAttention(核心难点)

Decode 时每个请求只有 1 个新 query token,但需要和分散在多个 Page 上的 KV Cache 做 Attention。核心思路是:

  1. 将 query 复制到对应的每个 KV Page 上(dispatch)
  2. 在每个 Page 上独立计算局部 Attention
  3. 通过 Online Softmax 聚合所有 Page 的结果(reduce/combine)
python
# Decode 阶段:单个 query 对 N 页 KV Cache 的聚合
q = torch.randn(1, 1, d)          # 1 个 query token
K = torch.randn(num_pages, page_size, d)  # KV Cache 分散在多个页上
V = torch.randn(num_pages, page_size, d)

# 每页独立计算局部 Attention
S = q @ K.transpose(1, 2)                          # [num_pages, 1, page_size]
M, _ = torch.max(S, dim=-1, keepdim=True)           # 局部最大值
L = torch.sum(torch.exp(S - M), dim=-1, keepdim=True)  # 局部归一化因子
P = torch.exp(S - M) / L
O = P @ V                                           # 局部输出

# 通过 Online Softmax 聚合多页结果
def combine_result(O, L, M):
    """将多页的局部 Attention 结果聚合为全局正确结果"""
    M_new, _ = torch.max(M, dim=0, keepdim=True)     # 全局最大值
    L_new = torch.exp(M - M_new) * L
    L_new = torch.sum(L_new, dim=0, keepdim=True)     # 全局归一化因子
    O_new = torch.exp(M - M_new) * (L / L_new) * O   # 重新缩放
    return O_new.sum(dim=0, keepdim=True)

O_final = combine_result(O, L, M)

这个 Online Softmax 聚合的数学保证是:全局 Softmax 可以分解为局部 Softmax 的加权组合,通过维护全局最大值 M 和归一化因子 L 实现精确(非近似)计算。

Copy-on-Write 优化(Beam Search 场景)

在 Beam Search 中,多个候选序列共享相同的前缀。传统方式需要为每个候选复制一份完整的 KV Cache,而 PagedAttention 的 Copy-on-Write 机制允许:

  • 多个候选序列共享前缀 Block,引用计数 > 1
  • 只有当某个候选需要修改某个 Block 时,才复制该 Block
  • 显存节省高达 55%(vLLM 论文数据)

Continuous Batching

Static Batching 的低效

传统的 Static Batching 将一批请求打包,等所有请求都生成完毕后才处理下一批:

时间 →
请求 A: [====生成 10 tokens====][等待...........][等待...........]
请求 B: [====生成 10 tokens====][====生成 30 tokens====][等待...........]
请求 C: [====生成 10 tokens====][====生成 30 tokens====][====生成 50 tokens====]

                                               A 和 B 早已完成,GPU 在空转

问题:短请求完成后 GPU 在"陪跑"长请求,资源严重浪费。

Continuous Batching 的调度策略

Continuous Batching(也叫 Iteration-level Scheduling)在每次生成一个 token 后重新调度:

时间 →
Step 1: [A][B][C]      ← 3 个请求并行生成
Step 2: [A][B][C]
...
Step 10: [A完成][B][C]  ← A 完成,立即释放位置
Step 11: [D加入][B][C]  ← 新请求 D 立即插入
...
Step 30: [D][B完成][C]  ← B 完成
Step 31: [D][E加入][C]  ← 新请求 E 插入

核心实现涉及三个队列的管理:

python
class RequestManager:
    def __init__(self, max_batch_size: int):
        self.requests = {}            # request_id -> Request
        self.waiting_queue = deque()  # 等待处理的请求
        self.running_set = set()      # 正在生成的请求
        self.max_batch_size = max_batch_size

    def add_request(self, prompt, max_seq_len):
        """新请求进入 waiting 队列"""
        request_id = len(self.requests)
        request = Request(request_id, prompt, max_seq_len)
        self.requests[request_id] = request
        self.waiting_queue.append(request_id)
        return request_id

    def get_pending_requests(self, max_count):
        """从 waiting 队列取出可调度的请求"""
        available_slots = self.max_batch_size - len(self.running_set)
        count = min(available_slots, max_count, len(self.waiting_queue))
        pending = []
        for _ in range(count):
            req_id = self.waiting_queue.popleft()
            self.running_set.add(req_id)
            pending.append((req_id, self.requests[req_id].prompt))
        return pending

    def update_request(self, request_id, next_token):
        """更新请求状态,完成的请求立即释放"""
        request = self.requests[request_id]
        request.add_token(next_token)
        if request.is_finished():
            self.running_set.discard(request_id)
            # 槽位释放,下一步可以插入新请求

KV Cache 管理(槽位式):

python
class KVCacheManager:
    def __init__(self, config):
        self.kv_cache = torch.zeros(
            2, config.num_layers, config.max_batch_size,
            config.max_seq_len, config.num_heads, config.head_dim
        )
        self.slot_to_request = {}
        self.request_to_slot = {}
        self.available_slots = set(range(config.max_batch_size))

    def allocate_slots(self, request_ids):
        """为新请求分配 KV Cache 槽位"""
        allocated = []
        for req_id in request_ids:
            slot = self.available_slots.pop()
            self.slot_to_request[slot] = req_id
            self.request_to_slot[req_id] = slot
            allocated.append(slot)
        return allocated

    def free_slot(self, request_id):
        """释放已完成请求的槽位"""
        slot = self.request_to_slot.pop(request_id)
        del self.slot_to_request[slot]
        self.kv_cache[:, :, slot, :, :, :] = 0  # 清空 KV Cache
        self.available_slots.add(slot)

Continuous Batching 将吞吐量提升 2-8 倍,是现代推理引擎的标配。


Chunked Prefill

长 Prefill 阻塞 Decode 的问题

当系统同时服务多个用户时,新到达的长 prompt(如 4K token)的 Prefill 会长时间独占 GPU,导致已经在 Decode 阶段的请求被阻塞,用户感受到明显的卡顿(TPOT 突增)。

不使用 Chunked Prefill:
[Decode R1][Decode R2][========= Prefill R3 (4K tokens) =========][Decode R1][Decode R2]
                       ↑ R1、R2 的 Decode 被阻塞了很久

使用 Chunked Prefill:
[Decode R1][Decode R2][Prefill R3 chunk1][Decode R1][Decode R2][Prefill R3 chunk2][Decode R1]...
                                          ↑ R1、R2 的 Decode 不被打断

将 Prefill 拆分为 Chunks

核心思想:将长 prompt 按固定大小(如 512 token)切分为多个 chunk,每个 chunk 和当前的 Decode 请求合并成一个 batch 执行。

Chunked Prefill 的注意力计算需要处理跨 chunk 的 KV Cache 拼接:

python
class ChunkedPrefillAttention(nn.Module):
    def forward(self, X, KVCache):
        Q = self.WQ(X)
        K = self.WK(X)
        V = self.WV(X)

        if KVCache is not None:
            K_cache, V_cache = KVCache
            K_ = torch.cat([K_cache, K], dim=1)  # 拼接历史 KV 和当前 chunk
            V_ = torch.cat([V_cache, V], dim=1)
        else:
            K_, V_ = K, V

        # 标准 Attention 计算
        S = Q @ K_.transpose(-2, -1) / math.sqrt(self.head_dim)
        P = F.softmax(S, dim=-1)
        O = P @ V_
        return O, (K_, V_)  # 返回更新后的 KV Cache

Chunked Prefill 的调度逻辑

python
def chunk_prefill_method(model, x, page_size, dim):
    """将长 prompt 分 chunk 处理"""
    bsz, seq_len = x.shape
    num_chunks = (seq_len + page_size - 1) // page_size
    KVCache = None

    for i in range(num_chunks):
        start = i * page_size
        end = min((i + 1) * page_size, seq_len)
        chunk = x[:, start:end]

        logits, KVCache = model(chunk, KVCache=KVCache)

        if i == num_chunks - 1:  # 最后一个 chunk
            last_token_logits = logits[:, -1, :]
        else:
            last_token_logits = None  # 非最后一个 chunk 的 logits 无效

    return last_token_logits, KVCache

混合 PD 请求的 Chunked Prefill 调度器(vLLM V1 核心):

python
def plan_next_batch(self, max_batch_tokens, max_decoding_batch, max_prefill_batch):
    """Decode 优先,Prefill 填充剩余 token budget"""
    plan = BatchPlan()
    budget_used = 0

    # 1. Decoding 请求优先并入 batch
    for req_id in self.running_requests:
        req = self.requests[req_id]
        if req.is_decoding():
            plan.ids.append(req_id)
            plan.token_segments.append([req.generated_tokens[-1]])
            plan.chunk_len.append(1)
            budget_used += 1
        if budget_used == max_decoding_batch:
            break

    # 2. Prefill 请求填充剩余 token budget
    for req_id in self.running_requests:
        req = self.requests[req_id]
        if req.is_decoding():
            continue
        remaining_budget = max_batch_tokens - budget_used
        pending = req.prompt[req.kv_len:]

        if len(pending) <= remaining_budget:
            # 可以一次性 prefill 完
            plan.token_segments.append(pending)
            budget_used += len(pending)
        else:
            # 需要 chunked prefill
            plan.token_segments.append(pending[:remaining_budget])
            budget_used += remaining_budget

    return plan

关键设计原则

  1. Decode 优先:正在生成的请求不能被打断
  2. Token Budget:每步总 token 数有上限(如 max_batch_tokens=8192
  3. Chunk 大小可变:不是固定的,而是用剩余 budget 填满

投机采样 (Speculative Decoding)

核心思想:小模型草稿 + 大模型验证

标准的自回归解码每步只能生成 1 个 token,而每步都需要完整的前向传播。投机采样的核心观察是:

Decode 阶段是 Memory-bound,增加少量计算(验证多个 token)几乎不增加时间。

因此可以用一个小模型(Draft Model,如 7B)快速猜测 K 个 token,然后用大模型(Target Model,如 70B)一次前向传播同时验证这 K 个 token。

python
class SPDecoding:
    def __init__(self, model_target, model_draft, spec_n):
        self.model_target = model_target
        self.model_draft = model_draft
        self.spec_n = spec_n  # 每次猜测的 token 数,如 5

    def generate_draft(self, spec_n, x):
        """小模型自回归生成 spec_n 个候选 token"""
        logits_y = []
        for i in range(spec_n):
            with torch.no_grad():
                logits = self.model_draft(x)[:, [-1], :]
                logits_y.append(logits)
                next_token = torch.argmax(logits, dim=-1)
                x = torch.cat([x, next_token], dim=1)
        return x, torch.cat(logits_y, dim=1)

    def generate(self, x, max_new_tokens=30):
        count = 0
        y_new = []
        for i in range(max_new_tokens):
            # Step 1: 小模型猜测
            x_spec, logits_draft = self.generate_draft(self.spec_n, x)
            y_spec = x_spec[:, -self.spec_n:]

            # Step 2: 大模型一次性验证
            logits_target = self.model_target(x_spec)[:, -self.spec_n-1:]
            y_target = torch.argmax(logits_target, dim=-1)

            # Step 3: 从左到右逐个验证
            y_target_verify = y_target[:, :-1]
            verify = y_spec == y_target_verify
            idx1, idx2 = torch.where(verify == False)

            if len(idx2) == 0:
                accept_len = self.spec_n  # 全部接受
            else:
                accept_len = idx2[0]      # 部分接受

            # Step 4: 更新输入
            x = torch.cat((x, y_target[:, :accept_len+1]), dim=1)
            y_new.append(y_target[:, :accept_len+1])
            count += (accept_len + 1)

            if count >= max_new_tokens - 1:
                break
        return torch.cat(y_new, dim=1)

验证算法的数学保证(为什么输出分布不变)

上面的 Greedy 版本容易理解但实际中多用采样模式。Speculative Sampling 的关键在于验证步骤的概率保证:

对于草稿模型的概率分布 q(x) 和目标模型的概率分布 p(x)

  1. 以概率 min(1,p(x)q(x)) 接受草稿 token x
  2. 如果拒绝,从修正分布 p(x)=norm(max(0,p(x)q(x))) 重新采样
python
class SPSamplingDecoding:
    def generate(self, x, max_new_tokens=30):
        count = 0
        y_new = []
        for i in range(max_new_tokens):
            x_spec, logits_draft = self.generate_draft(self.spec_n, x)
            y_spec = x_spec[:, -self.spec_n:]
            logits_target = self.model_target(x_spec)[:, -self.spec_n-1:]

            accept_len = 0
            next_tokens = []
            for j in range(self.spec_n):
                r = torch.rand(1).item()  # 均匀分布随机数
                token_id = y_spec[0, j]
                q = F.softmax(logits_target[0, j], dim=-1)
                p = F.softmax(logits_draft[0, j], dim=-1)

                # 核心验证:以 min(1, q/p) 的概率接受
                if r < min(1, q[token_id] / p[token_id]):
                    accept_len += 1
                    next_tokens.append(y_spec[0, j])
                else:
                    # 拒绝:从修正分布重新采样
                    q_ = q.clone()
                    idx = torch.where(q < p)
                    q_[idx] = p[idx]  # max(0, p-q) 的近似
                    next_token = torch.multinomial(q_, num_samples=1)
                    next_tokens.append(next_token)
                    break

            if accept_len == self.spec_n:
                # 全部接受,bonus:大模型额外输出 1 个 token
                next_tokens.append(y_target[0, accept_len])

            x = torch.cat((x, torch.tensor([next_tokens])), dim=-1)
            y_new.append(torch.tensor([next_tokens]))
            count += len(next_tokens)
            if count >= max_new_tokens - 1:
                break
        return torch.cat(y_new, dim=1)

接受率分析

  • 最好情况:Draft Model 和 Target Model 分布完全一致 → 全部 K 个 token 被接受,加速 K+1
  • 最差情况:分布完全不同 → 每次只接受 0 个,退化为标准解码(还多了 Draft 的开销)
  • 实际情况:使用同系列的小模型(如 Llama-7B 作为 Llama-70B 的 Draft),接受率通常在 70-85%,加速 2-3 倍

KL 散度可以量化两个模型输出分布的差异:

python
def KL(logits_p, logits_q):
    log_p = F.log_softmax(logits_p, dim=-1)
    q = F.softmax(logits_q, dim=-1)
    kl = F.kl_div(log_p, q, reduction='sum', log_target=False)
    return kl

# KL 越小,接受率越高
# target vs draft (相近模型): KL ≈ 14.7
# target vs random:          KL ≈ 110.4

EAGLE-3:基于特征层的树状投机采样

上面的基础投机采样有两个关键局限:

  1. Draft Model 是独立的小模型,与 Target Model 的分布差异大,接受率有上限
  2. 线性草稿生成,每步只猜一条路径

EAGLE-3(腾讯 AngelSlim 开源)的核心创新是:Draft Model 不再独立,而是直接利用 Target Model 的隐藏状态,并以树状结构并行生成多条候选路径。

Draft Model 架构

EAGLE-3 的 Draft Model 非常轻量(Target Model 参数的 1/8 ~ 1/4),其核心是接收 Target Model 中间层的隐藏状态:

Target Model (70B)               Draft Model (Light)
┌─────────────┐                  ┌───────────────┐
│  Layer 0-31 │──hidden_states──→│ Combine Layer  │
│  Layer 32   │                  │ (Linear Proj)  │
│  ...        │                  ├───────────────┤
│  Layer N    │                  │ 2-4 Attn Layers│
└─────────────┘                  │ (Shared Embed) │
                                 ├───────────────┤
                                 │  LM Head       │
                                 │ (Vocab Pruned) │
                                 └───────────────┘
python
class Eagle3DraftModel(nn.Module):
    """EAGLE-3 Draft Model 简化实现"""
    def __init__(self, target_hidden_size, draft_hidden_size, 
                 num_layers=2, num_heads=8, draft_vocab_size=8000):
        super().__init__()
        # 从 Target Model 隐藏状态投影到 Draft 空间
        self.hidden_combine = nn.Linear(target_hidden_size, draft_hidden_size)
        
        # 轻量 Transformer 层
        self.layers = nn.ModuleList([
            nn.TransformerDecoderLayer(
                d_model=draft_hidden_size,
                nhead=num_heads,
                dim_feedforward=draft_hidden_size * 4,
                batch_first=True
            ) for _ in range(num_layers)
        ])
        
        # 裁剪后的词表头(只保留高频 token)
        self.lm_head = nn.Linear(draft_hidden_size, draft_vocab_size, bias=False)
        
        # 词表映射:draft_vocab → target_vocab
        self.d2t = None  # 在训练时从数据频率统计构建
        self.t2d = None  # target_vocab → draft_vocab (布尔映射)
    
    def forward(self, target_hidden_states, input_ids, attention_mask=None):
        # Step 1: 融合 Target Model 的隐藏状态
        h = self.hidden_combine(target_hidden_states)
        
        # Step 2: 加入 token embedding
        embed = self.embed_tokens(input_ids)
        h = h + embed
        
        # Step 3: 通过轻量 Transformer 层
        for layer in self.layers:
            h = layer(h, memory=None, tgt_mask=attention_mask)
        
        # Step 4: 输出裁剪词表上的 logits
        logits = self.lm_head(h)
        return logits

词表裁剪 (Token Pruning)

EAGLE-3 的一个关键优化是词表裁剪:Draft Model 不需要预测全部 128K+ 的词表,只需覆盖最常见的 token:

python
def build_vocab_mapping(dataset, draft_vocab_size=8000, target_vocab_size=128256):
    """从训练数据的 token 频率构建词表映射"""
    from collections import Counter
    
    token_freq = Counter()
    for sample in dataset:
        token_freq.update(sample['input_ids'])
    
    # 取 Top-N 高频 token
    top_tokens = [tok for tok, _ in token_freq.most_common(draft_vocab_size)]
    top_tokens.sort()  # 保持有序
    
    # 覆盖率统计
    total = sum(token_freq.values())
    covered = sum(token_freq[t] for t in top_tokens)
    print(f"词表裁剪: {target_vocab_size}{draft_vocab_size}")
    print(f"Token 覆盖率: {covered/total:.1%}")  # 通常 > 95%
    
    # 构建双向映射
    d2t = {i: tok for i, tok in enumerate(top_tokens)}    # draft → target
    t2d = {tok: i for i, tok in enumerate(top_tokens)}    # target → draft
    
    return d2t, t2d

树状候选生成

与线性猜测不同,EAGLE-3 用树结构并行生成多条候选路径:

                    [token_0]
                   /    |    \
            [top1]   [top2]   [top3]     ← depth 1: 3 个分支
            / |        |        \
       [t1] [t2]    [t1]      [t1]       ← depth 2: 扩展
        |     |       |
      [t1]  [t1]   [t1]                  ← depth 3: 继续
python
def tree_speculative_decode(target_model, draft_model, input_ids, 
                            tree_depth=5, top_k=10, total_tokens=60):
    """树状投机采样推理循环"""
    
    # Step 1: Prefill - Target Model 计算初始隐藏状态
    with torch.no_grad():
        target_out = target_model(input_ids, output_hidden_states=True)
        target_logits = target_out.logits[:, -1:]
        hidden_states = target_out.hidden_states  # 多层隐藏状态
    
    # Step 2: Draft Model 生成树状候选
    candidates = []  # 所有候选路径
    tree_tokens = []
    tree_positions = []
    
    # 根节点:取 Target 的 top-k 作为第一层
    probs = F.softmax(target_logits[:, -1], dim=-1)
    top_k_tokens = torch.topk(probs, k=top_k).indices[0]
    
    # 广度优先展开树
    queue = [(tok.item(), [tok.item()], 0) for tok in top_k_tokens]
    
    while queue and len(tree_tokens) < total_tokens:
        token, path, depth = queue.pop(0)
        if depth >= tree_depth:
            continue
        
        tree_tokens.append(token)
        tree_positions.append(len(input_ids[0]) + depth)
        
        # Draft Model 预测下一层
        draft_input = torch.tensor([[token]], device=input_ids.device)
        draft_logits = draft_model(hidden_states[-1][:, -1:], draft_input)
        draft_probs = F.softmax(draft_logits[:, -1], dim=-1)
        
        # 选 top-k 子节点
        next_tokens = torch.topk(draft_probs, k=min(3, top_k)).indices[0]
        for next_tok in next_tokens:
            new_path = path + [next_tok.item()]
            queue.append((next_tok.item(), new_path, depth + 1))
            candidates.append(new_path)
    
    # Step 3: Target Model 一次验证所有候选
    all_draft_tokens = torch.tensor([tree_tokens], device=input_ids.device)
    tree_mask = build_tree_attention_mask(tree_positions)  # 因果+树结构
    
    with torch.no_grad():
        verify_logits = target_model(
            all_draft_tokens, 
            attention_mask=tree_mask,
            position_ids=torch.tensor([tree_positions])
        ).logits
    
    # Step 4: 逐路径验证,找到最长接受路径
    best_path, accept_len = evaluate_tree_candidates(
        candidates, verify_logits, target_logits
    )
    
    return best_path, accept_len

Draft Model 训练流水线

EAGLE-3 的训练分两步:

Step 1: 生成训练数据 — 用 Target Model 对语料做推理,保存每层隐藏状态:

bash
# 1. 启动 Target Model 生成对话数据
python generate_data_for_target_model.py \
    --model_path Qwen/Qwen3-32B \
    --data_path ShareGPT_data.jsonl \
    --output_path generated_data/

# 2. 提取隐藏状态(这是关键步骤)
python generate_hidden_for_draft_model.py \
    --model_path Qwen/Qwen3-32B \
    --data_path generated_data/ \
    --output_path hidden_states/ \
    --save_all_layers True

Step 2: 训练 Draft Model — 用 KL 散度对齐 Draft 和 Target 的输出分布:

python
def eagle3_training_step(draft_model, batch, training_steps=7):
    """EAGLE-3 训练步骤(简化版)"""
    input_ids = batch['input_ids']
    target_logits = batch['target_logits']     # 预计算的 Target 输出
    hidden_states = batch['hidden_states']      # 预计算的 Target 隐藏状态
    
    total_loss = 0
    losses = []
    
    for step in range(training_steps):
        # Draft Model 前向
        draft_logits = draft_model(hidden_states, input_ids)
        
        # Target 概率(在裁剪词表上)
        target_probs_pruned = F.softmax(
            target_logits[..., draft_model.t2d], dim=-1  # 词表裁剪
        )
        
        # KL 散度损失
        draft_log_probs = F.log_softmax(draft_logits, dim=-1)
        step_loss = -torch.sum(target_probs_pruned * draft_log_probs) / batch_size
        losses.append(step_loss)
        
        # 更新输入(自回归移位)
        input_ids = shift_tokens(input_ids, target_logits)
    
    # 指数衰减加权:越近的预测权重越大
    weights = [0.8 ** i for i in range(len(losses))]
    total_loss = sum(w * l for w, l in zip(weights, losses))
    
    return total_loss

EAGLE-3 性能数据(来自 AngelSlim 基准测试)

模型方法GSM8KAlpacaHumanEvalMT-bench平均加速
Qwen3-1.7B标准推理376 tok/s379 tok/s378 tok/s391 tok/s1x
Qwen3-1.7BEAGLE-3617 tok/s653 tok/s680 tok/s621 tok/s1.68x
Qwen3-8B标准推理150 tok/s150 tok/s154 tok/s154 tok/s1x
Qwen3-8BEAGLE-3257 tok/s267 tok/s245 tok/s258 tok/s1.70x
Qwen3-32B标准推理43.5 tok/s43.4 tok/s43.2 tok/s43.3 tok/s1x
Qwen3-32BEAGLE-380.4 tok/s72.5 tok/s71.6 tok/s74.1 tok/s1.71x

测试条件:vLLM v0.11.2, num_speculative_tokens=2, batch_size=1, output_len=1024

关键观察:平均接受长度约 2.5 token/step,模型越大加速比越稳定。

SpecExit:推理模型的思考早退

对于 DeepSeek-R1、QwQ 等推理模型,生成的 token 中大量是"思考过程"(Reasoning Token),实际有用的答案只占一小部分。SpecExit(腾讯,OSDI 2025 投稿)的核心思想是:

用 Draft Model 的隐藏状态预测何时可以停止思考,无需等待完整的推理链。

工作原理

Target Model 生成推理链:
"<think>首先分析题目... 然后计算... 验证结果...</think>答案是42"

SpecExit 在中间某处判断:
"<think>首先分析题目... 然后计算...</think>答案是42"

        此处已经可以得到正确答案,后面的思考是冗余的

信号预测与停止判据

SpecExit 从 Draft Model 的隐藏状态中提取三类信号:

python
class SpecExitPredictor:
    """SpecExit 早退信号预测器"""
    
    def __init__(self, method="ewma", alpha=0.3, window_size=10):
        self.method = method
        self.alpha = alpha
        self.window_size = window_size
        self.scores = []
    
    def update(self, draft_hidden_state):
        """从 Draft Model 隐藏状态提取停止信号"""
        # 信号类型1: 置信度 (confidence) — 答案 token 的概率
        # 信号类型2: 进度 (progress) — 已完成推理的比例
        # 信号类型3: 剩余量 (remain) — 预计还需要多少 token
        score = self.extract_signal(draft_hidden_state)
        self.scores.append(score)
        return self.predict_next()
    
    def predict_next(self):
        """预测下一步的信号值"""
        if self.method == "ewma":
            # 指数加权移动平均
            ewma = self.scores[0]
            for s in self.scores[1:]:
                ewma = self.alpha * s + (1 - self.alpha) * ewma
            return ewma
        
        elif self.method == "momentum":
            # 动量法:用历史趋势外推
            if len(self.scores) < 2:
                return self.scores[-1]
            deltas = [self.scores[i] - self.scores[i-1] 
                      for i in range(1, len(self.scores))]
            avg_delta = sum(deltas[-self.window_size:]) / len(deltas[-self.window_size:])
            return self.scores[-1] + avg_delta
        
        elif self.method == "mean":
            # 滑动窗口均值
            window = self.scores[-self.window_size:]
            return sum(window) / len(window)
    
    def should_exit(self, predicted_score):
        """判断是否可以提前退出"""
        if "confidence" in self.method:
            return predicted_score > 0.8   # 答案置信度足够高
        elif "progress" in self.method:
            return predicted_score > 0.3   # 推理进度超过 30%
        elif "remain" in self.method:
            return predicted_score < 200   # 预计剩余 token < 200
        return False

SpecExit 效果(在推理模型上):

  • 生成长度减少 66%(省去冗余思考)
  • 端到端延迟降低 2.5x
  • 答案准确率基本不变

投机采样方法对比

方法Draft Model接受率加速比特点
基础投机采样独立小模型70-85%2-3x简单,Draft 与 Target 解耦
EAGLE-3特征级轻量模型80-90%1.7-1.9x利用 Target 隐藏状态,树状搜索
Medusa多个 LM Head60-80%1.5-2x无需独立 Draft Model
SpecExitEAGLE-3 + 早退-2.5x+针对推理模型,减少冗余思考
Self-SpeculativeTarget 自身浅层50-70%1.3-1.5x无需额外模型

Prefill-Decode 分离 (PD Disaggregation)

为什么要分离?

Prefill 是 Compute-bound,Decode 是 Memory-bound,二者对硬件的需求截然不同:

维度PrefillDecode
计算特征大矩阵乘法,高算力需求向量-矩阵乘法,高带宽需求
理想硬件高 FLOPS 的 GPU (如 H100)高带宽的 GPU 或 Memory-centric 架构
Batch Size通常较小(几个请求)可以很大(数百个请求)
耗时可预测(与 prompt 长度成正比)不确定(取决于生成长度)

将 Prefill 和 Decode 放在不同的物理节点上运行,可以分别优化:

  1. Prefill 节点:配置高算力 GPU,专注处理 prompt
  2. Decode 节点:配置高带宽系统,专注逐 token 生成
  3. 各自优化 Batch Size:互不干扰

架构设计

PD 分离的基本架构:

                    ┌──────────────────────┐
                    │      Scheduler       │
                    │  (Ray Remote Actor)  │
                    └──────┬───────┬───────┘
                           │       │
              ┌────────────▼──┐ ┌──▼────────────┐
              │ Prefill Node  │ │ Decode Node   │
              │  (GPU Group)  │ │  (GPU Group)  │
              └──────┬────────┘ └────┬──────────┘
                     │               │
                     └───────┬───────┘

                    ┌────────▼────────┐
                    │  KV Cache Store │
                    │  (Distributed)  │
                    └─────────────────┘

分布式调度器(基于 Ray):

python
@ray.remote
class Scheduler:
    def __init__(self, max_seq_len=1024):
        self.requests = {}
        self.waiting_queue = deque()     # 等待 Prefill 的请求
        self.running_requests = set()    # 正在 Decode 的请求
        self.is_running_prefill = True
        self.is_running_decoding = True

    async def add_request(self, prompt, max_seq_len):
        """异步接收新请求"""
        request_id = len(self.requests)
        request = Request(request_id, prompt, max_seq_len)
        self.requests[request_id] = request
        self.waiting_queue.append(request_id)
        return request_id

    def get_waiting_requests(self):
        """Prefill 节点拉取等待的请求"""
        if len(self.waiting_queue) == 0:
            return None
        plan = BatchPlan()
        for _ in range(len(self.waiting_queue)):
            request_id = self.waiting_queue.popleft()
            plan.prompts.append(self.requests[request_id].prompt)
            plan.ids.append(request_id)
        return plan

    def get_running_requests(self):
        """Decode 节点拉取正在生成的请求"""
        if len(self.running_requests) == 0:
            return None
        plan = BatchPlan()
        for request_id in self.running_requests:
            plan.prompts.append([self.requests[request_id].generated_tokens[-1]])
            plan.ids.append(request_id)
        return plan

KV Cache 传输策略

PD 分离的核心挑战是 KV Cache 的传输:Prefill 节点计算完 KV Cache 后需要传输到 Decode 节点。

三种 KV Cache 管理模式:

  1. KV Cache 仅存于 Decode 节点:Prefill 计算后直接传输过去(实现简单)
  2. PD 各存各的:Prefill 节点也保留 Cache,用于 Chunked-Prefill
  3. 中心化 Cache 服务:独立的 KV Cache 服务,PD 节点都从中存取
python
@ray.remote
class DistributedKVCacheEngine:
    def __init__(self, config):
        self.kv_cache = torch.zeros(
            2, config.num_layers, config.kv_cache_batch,
            config.kv_cache_len, config.num_heads, config.head_dim
        )
        self.request_to_batch = {}  # request_id -> kv_batch_id

    async def update_from_prefill(self, reqs, kv):
        """Prefill 节点写入 KV Cache"""
        start_ids = len(self.request_to_batch)
        for i, req_id in enumerate(reqs):
            self.request_to_batch[req_id] = start_ids + i
        self.kv_cache[:, :, start_ids: start_ids + len(reqs)] = kv

    def update_from_decoding(self, reqs, kv, decoding_idx):
        """Decode 节点更新单 token 的 KV"""
        for i, req_id in enumerate(reqs):
            pos = decoding_idx[i]
            batch_id = self.request_to_batch[req_id]
            self.kv_cache[:, :, batch_id, pos] = kv[:, :, i, 0]

    def get_kv_cache(self, reqs):
        """Decode 节点读取 KV Cache"""
        kv_batch_ids = [self.request_to_batch[r] for r in reqs]
        return self.kv_cache[:, :, kv_batch_ids]

PD 分离引擎的主循环

python
@ray.remote
class DissagreationPDEngine:
    def __init__(self, config, prefill_actors, decoding_actors, scheduler, kvcache):
        self.prefill_actors = prefill_actors
        self.decoding_actors = decoding_actors
        self.scheduler = scheduler
        self.kvcache = kvcache

    def _step_prefill(self):
        """Prefill 节点的主循环"""
        while not self._is_finish_prefill():
            info = ray.get(self.scheduler.get_waiting_requests.remote())
            if info is None:
                time.sleep(0.1)
                continue
            # 执行 Prefill
            input_ids = self._get_prefill_batch(info)
            results = ray.get(self.prefill_actors.forward(x=input_ids))
            logits, kv = results[0]
            # 写入 KV Cache
            ray.get(self.kvcache.update_from_prefill.remote(info.ids, kv))
            # 更新请求状态 → 进入 Decode 阶段
            next_token = torch.argmax(logits[:, -1, :], dim=-1)
            for i, req_id in enumerate(info.ids):
                ray.get(self.scheduler.update_request.remote(req_id, next_token[i].item()))

    def _step_decoding(self):
        """Decode 节点的主循环"""
        while not self._is_finish_decoding():
            info = ray.get(self.scheduler.get_running_requests.remote())
            if info is None:
                time.sleep(0.1)
                continue
            # 读取 KV Cache
            kv_cache = ray.get(self.kvcache.get_kv_cache.remote(info.ids))
            # 执行 Decode
            input_ids = self._get_decoding_batch(info)
            results = ray.get(self.decoding_actors.forward(x=input_ids, kvcaches=kv_cache))
            logits, kv = results[0]
            # 更新 KV Cache 和请求状态
            ray.get(self.kvcache.update_from_decoding.remote(info.ids, kv, info.last_pos))
            next_token = torch.argmax(logits[:, -1, :], dim=-1)
            for i, req_id in enumerate(info.ids):
                ray.get(self.scheduler.update_request.remote(req_id, next_token[i].item()))

PD 分离 vs PD 融合

  • PD 分离适合:大规模部署、异构硬件、高吞吐需求
  • PD 融合(Chunked Prefill)适合:单节点、低延迟需求
  • 实际系统通常是分离和融合兼并的,例如 Prefill 节点内部也可以用 Chunked Prefill

vLLM 架构深度解析

vLLM 是目前最流行的开源 LLM 推理引擎,集成了上述几乎所有优化技术。

vLLM V0 架构

V0 架构采用 Scheduler + Worker 的同步执行模式:

┌─────────────────────────────────────────────┐
│              vLLM V0 Engine                 │
├─────────────────────────────────────────────┤
│  Scheduler                                  │
│  ├── Waiting Queue(等待 Prefill 的请求)     │
│  ├── Running Queue(正在 Decode 的请求)      │
│  └── Swapped Queue(被换出到 CPU 的请求)     │
├─────────────────────────────────────────────┤
│  Block Manager (PagedAttention 显存管理)     │
│  ├── PageAllocator(逻辑块→物理块映射)        │
│  ├── GPU Block Allocator                    │
│  └── CPU Block Allocator(Swap 用)          │
├─────────────────────────────────────────────┤
│  Worker(模型执行)                           │
│  ├── Model Runner(前向传播)                 │
│  └── Cache Engine(KV Cache 物理存储)        │
└─────────────────────────────────────────────┘

V0 的 KVCachePool 实现:

python
class KVCachePool:
    """基于页帧的 KV 缓存存储池"""

    def __init__(self, config):
        self.allocator = PageAllocator(config.page_size, config.num_pages)
        self.page_size = config.page_size

        # KV 的物理存储:按页组织
        self.kv_store = torch.zeros(
            2,                  # K 和 V
            config.num_layers,  # Transformer 层数
            config.num_pages,   # 总页数
            config.page_size,   # 每页 token 数
            config.num_heads,
            config.head_dim,
        )
        self.req_page_map = {}   # request_id -> [page_id, ...]
        self.cached_tokens = {}  # request_id -> 已缓存 token 数

    def write_tokens(self, request_id, KV):
        """将新计算的 KV 追加写入页帧,按需扩展"""
        _, L, T, H, D = KV.shape
        if request_id not in self.cached_tokens:
            # 首次写入:分配起始页
            self.cached_tokens[request_id] = 0
            initial = self.allocator.acquire(count=1)
            self.req_page_map[request_id] = initial

        remaining = T
        while remaining > 0:
            pages = self.req_page_map[request_id]
            capacity = self.page_size * len(pages) - self.cached_tokens[request_id]

            if capacity == 0:
                # 当前页已满,扩展一页
                extra = self.allocator.acquire(count=1, prev_page=pages[-1])[0]
                self.req_page_map[request_id].append(extra)
                capacity = self.page_size

            # 写入本轮数据
            n_write = min(capacity, remaining)
            dst_page = pages[-1]
            offset = self.page_size - capacity
            src_start = T - remaining
            self.kv_store[:, :, dst_page, offset:offset + n_write] = KV[:, :, src_start:src_start + n_write]
            remaining -= n_write
            self.cached_tokens[request_id] += n_write

    def free(self, request_id):
        """释放请求占用的全部页帧"""
        pages = self.req_page_map[request_id]
        self.allocator.release(pages)
        self.kv_store[:, :, pages, :, :, :] = 0
        del self.req_page_map[request_id]

vLLM V1 架构演进

V1 的核心改进是消除 Prefill/Decode 的概念区分,统一为 Chunked Prefill:

┌──────────────────────────────────────────────────────┐
│                 vLLM V1 EngineCore                   │
├──────────────────────────────────────────────────────┤
│  Scheduler(Chunked Prefill 统一调度)                │
│  ├── Decode 请求优先并入 batch                        │
│  ├── Prefill 请求按 token budget 切分                 │
│  └── 合并为一条序列送入模型                            │
├──────────────────────────────────────────────────────┤
│  KVCachePool(分页式 KV Cache)                       │
│  ├── 按需分配/释放物理页                               │
│  └── 请求级别的页映射管理                              │
├──────────────────────────────────────────────────────┤
│  ModelWrapper(模型执行封装)                          │
│  ├── PageAttention Prefill Kernel                    │
│  └── PageAttention Decoding Kernel                   │
└──────────────────────────────────────────────────────┘

V1 Engine 的 step 函数(核心主循环):

python
class vLLMEngine:
    def __init__(self, model, config):
        self.cacher = KVCachePool(config)
        self.model_wrapper = ModelWrapper(model, self.cacher)
        self.scheduler = Scheduler(config.max_seq_len)

    def step(self, config):
        """
        一步推理的完整流程:
        1. 获取融合 batch(Decode 优先 + Chunked Prefill)
        2. 执行前向计算
        3. 生成 next token
        4. 更新状态 / KV Cache
        """
        if self.scheduler.get_available_request() == 0:
            return

        # 1. 调度:获取混合 PD batch
        info = self.scheduler.plan_next_batch(
            max_batch_tokens=config.max_batch_tokens,
            max_prefill_batch=config.max_prefill_batch,
            max_decoding_batch=config.max_decoding_batch,
        )

        # 2. 获取 KV Cache
        kv_cache, info.kv_page_len = self.cacher.get_kv_cache(info.ids)

        # 3. 将多个请求的 token 拼接成一条序列
        input_ids = torch.tensor([info.flat_input], dtype=torch.long)

        # 4. 执行计算
        next_token, kv = self.execute(input_ids, kv_cache, info)

        # 5. 更新请求状态和 KV Cache
        self.update(next_token, kv, info)

    def execute(self, input_ids, kv_cache, info):
        logits, KV = self.model_wrapper.forward(input_ids, kv_cache, info)
        next_token = torch.argmax(logits, dim=-1)

        # 按请求拆分 next token
        tokens = []
        reqs_next_token = torch.split(next_token, info.chunk_len, dim=0)
        for bid, token_ids in enumerate(reqs_next_token):
            if info.decode_flags[bid]:
                tokens.append(token_ids[-1])
            elif info.last_pos[bid] != -1:
                tokens.append(token_ids[-1])  # prefill 完成
            else:
                tokens.append(-1)  # chunked prefill 未完成
        return tokens, KV

    def update(self, next_token, KV, info):
        # 更新请求
        for bid, token in enumerate(next_token):
            req_id = info.ids[bid]
            if token != -1:
                self.scheduler.update_request(req_id, token.item())
        # 更新 KV Cache
        reqs_KV = KV.split(info.chunk_len, dim=2)
        for req_id, tmp_KV in zip(info.ids, reqs_KV):
            if self.scheduler.requests[req_id].status == "REQUEST_COMPLETED":
                self.cacher.free(req_id)
            else:
                self.cacher.write_tokens(req_id, tmp_KV)
        # 更新 kv_len
        for i, req_id in enumerate(info.ids):
            self.scheduler.requests[req_id].kv_len += info.chunk_len[i]

V1 的模型层如何处理混合 PD batch:

python
class PageAttentionBlock(nn.Module):
    def forward(self, X, KV_Cache, info):
        N_p = info.prefill_batch    # Prefill 请求数
        N_d = info.decoding_batch   # Decode 请求数

        Q, K, V = self.WQ(X), self.WK(X), self.WV(X)

        # 按请求拆分(每个请求的 chunk_len 不同)
        Q = Q.split(info.chunk_len, dim=0)
        K = K.split(info.chunk_len, dim=0)
        V = V.split(info.chunk_len, dim=0)

        # Decode 请求:使用 PageAttention Decoding Kernel
        if N_d > 0:
            O_d = self.forward_decoding(Q[:N_d], K[:N_d], V[:N_d],
                                         KV_Cache=KV_Cache[:N_d])

        # Prefill 请求:使用 PageAttention Prefill Kernel
        if N_p > 0:
            O_p = self.forward_prefill(Q[N_d:], K[N_d:], V[N_d:],
                                        KV_Cache=KV_Cache[N_d:])

        # 拼接输出
        if N_p > 0 and N_d > 0:
            O = torch.cat((O_d, O_p), dim=0)
        elif N_d > 0:
            O = O_d
        else:
            O = O_p
        return O

V0 vs V1 对比

特性V0V1
Prefill/Decode分开处理统一为 Chunked Prefill
调度粒度请求级别Token 级别(token budget)
新请求等当前 batch 有空位切 chunk 立即开始
GPU 利用率Decode 阶段低Prefill 填充空闲算力
实现复杂度较低较高

端侧推理引擎

前面介绍的优化技术(KV Cache、PagedAttention、投机采样等)主要面向云端 GPU 集群。但随着端侧芯片算力提升和隐私需求增长,在手机、嵌入式设备上直接运行 LLM 正在成为新的工程前沿。

本节从通用视角剖析端侧 LLM 部署的完整流程,涵盖模型适配、量化、编译和上板的每一步核心概念。

端侧推理的挑战与动机

为什么要在设备上跑大模型?

云端推理虽然算力充足,但存在不可忽视的局限性:

动机

  • 隐私保护:数据不出设备,用户的聊天记录、照片等敏感信息无需上传云端
  • 延迟要求:无网络往返延迟,在弱网/无网环境下也能流畅使用
  • 成本节约:无需云端 GPU 算力费用,边际成本趋近于零
  • 离线可用:飞行模式、地铁等无网场景依然可用

约束

  • 内存有限:手机通常只有 8-16GB 统一内存,需要与系统和其他 App 共享
  • 功耗限制:移动设备需要严格控制能耗,不能像 GPU 集群那样「暴力计算」
  • 无 CUDA:端侧芯片(NPU、ANE)有自己的指令集和编程模型,无法直接复用云端的 CUDA 生态

端侧模型规模限制

受内存和算力约束,端侧通常只能运行 0.5B-7B 参数的模型,且几乎必须使用 INT4/INT8 量化。这与云端动辄 70B-400B 的模型规模形成鲜明对比。

端侧推理引擎生态

引擎厂商目标硬件特点
QNN SDKQualcommSnapdragon NPU手机端主流,支持 INT4/INT8
Core MLAppleANE (Apple Neural Engine)iOS/macOS 设备
TensorRTNVIDIAJetson GPU边缘 GPU 加速
ONNX Runtime MobileMicrosoftCPU/NPU跨平台
MLC LLM开源CPU/GPU/NPU通用 LLM 推理
llama.cpp开源CPU纯 CPU,极致兼容
MediaPipeGoogleAndroid/iOS移动端 ML

如何选择?

  • Android + Qualcomm 芯片:优先 QNN SDK,充分利用 NPU 算力
  • iOS/macOS:优先 Core ML,与系统深度集成
  • 跨平台/快速验证:llama.cpp 或 MLC LLM
  • 边缘 GPU(Jetson):TensorRT

端侧部署全景流程

端侧部署一个 LLM(包括多模态模型)的通用流水线如下:

HuggingFace 模型 (FP32/FP16)
    ↓ Step 1: 模型适配(Model Adaptation)
适配后模型(Linear→Conv2d, Conv3d→Conv2d, Eager Attention)
    ↓ Step 2: 图优化(Graph Optimization)
优化后模型(算子融合、常量折叠、静态 shape 绑定)
    ↓ Step 3: 量化(PTQ + SeqMSE / LPBQ)
量化模型(W8A16 / W4A16)
    ↓ Step 4: 中间格式导出 + 精度验证
ONNX / TorchScript + 量化编码 + Test Vectors
    ↓ Step 5: NPU 编译流水线
设备可执行二进制(Context Binary / Core ML Model / TRT Engine)
    ↓ Step 6: 部署到设备
端侧芯片 NPU / ANE 执行

下面逐步深入每一个环节的核心概念与通用实现。

Step 1: 模型适配(Model Adaptation)

NPU 的算子支持与 GPU 完全不同。HuggingFace 模型直接导出到 NPU 会遇到大量不支持的算子(如 nn.LinearConv3d、Flash Attention 等)。因此第一步是改写模型结构,使其对 NPU 友好。

1.1 Linear → Conv2d 替换

多数 NPU 对 Conv2d 有深度优化的硬件指令,但对 nn.Linear 的支持较弱。解决方案:将所有 Linear 层替换为 1×1 Conv2d

python
import torch
import torch.nn as nn


class LinearToConv1x1(nn.Conv2d):
    """用 1x1 Conv2d 替换 Linear 层,保持功能等价。

    数学原理:nn.Linear(in, out) 计算 y = xW^T + b,
    等价于 Conv2d(in, out, kernel_size=1) 对 (B, in, 1, 1) 输入的计算。
    """

    def __init__(self, linear: nn.Linear):
        out_features, in_features = linear.weight.shape
        super().__init__(
            in_features, out_features, kernel_size=1,
            bias=(linear.bias is not None),
            dtype=linear.weight.dtype,
        )
        # Linear 权重 [out, in] → Conv2d 权重 [out, in, 1, 1]
        self.weight.data.copy_(linear.weight.data[:, :, None, None])
        if linear.bias is not None:
            self.bias.data.copy_(linear.bias.data)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        orig_shape = x.shape
        if x.ndim == 2:
            # (seq, hidden) → (1, hidden, 1, seq)
            x = x.unsqueeze(0).unsqueeze(2).permute(0, 3, 2, 1)
        elif x.ndim == 3:
            # (B, seq, hidden) → (B, hidden, 1, seq)
            x = x.permute(0, 2, 1).unsqueeze(2)
        x = super().forward(x)
        # 还原为原始 layout
        if len(orig_shape) == 2:
            x = x.permute(0, 3, 2, 1).squeeze(2).squeeze(0)
        elif len(orig_shape) == 3:
            x = x.squeeze(2).permute(0, 2, 1)
        return x

为什么这样做?

  • nn.Linear(in, out) 计算 y=xWT+b,等价于 Conv2d(in, out, kernel_size=1) 在空间维为 1 时的计算
  • NPU 对 Conv2d 有专门的硬件加速单元,能以更高吞吐执行
  • 继承 nn.Conv2d 而非包装,确保下游工具(如量化框架、LoRA Adapter)能正常识别

全模型递归替换的通用工具函数:

python
def replace_all_linears(module: nn.Module) -> nn.Module:
    """递归地将模型中所有 nn.Linear 替换为 LinearToConv1x1"""
    for name, child in module.named_children():
        if isinstance(child, nn.Linear):
            setattr(module, name, LinearToConv1x1(child))
        else:
            replace_all_linears(child)
    return module

model = replace_all_linears(model)

1.2 Conv3d → 多个 Conv2d 分解

多模态模型的 Vision Encoder 有时使用 Conv3d 做 Patch Embedding(处理视频的时空维度),但 NPU 通常不支持 3D 卷积。解决方案:沿某个维度(通常是时间维度)拆分,用多个 Conv2d 相加替代

python
class Conv3dDecomposer(nn.Module):
    """将一个 Conv3d 沿指定维度分解为多个 Conv2d 之和。

    原理:Conv3d(X) = sum_t W[:,:,t,:,:] * X[:,:,t,:,:] 沿时间维 t 可拆分。
    每个 Conv2d 处理一个时间切片,最终结果相加,数学完全等价。
    """

    def __init__(self, conv3d: nn.Conv3d, split_dim: int = 2):
        super().__init__()
        # split_dim: 在 (B, C, D, H, W) 中要拆分的维度,默认 D(时间)
        self.split_dim = split_dim
        kernel_3d = conv3d.weight  # [C_out, C_in, kD, kH, kW]
        kD = conv3d.kernel_size[0]

        # 提取 2D kernel size(去掉 split 维度)
        kernel_2d = (conv3d.kernel_size[1], conv3d.kernel_size[2])
        stride_2d = (conv3d.stride[1], conv3d.stride[2])

        self.convs = nn.ModuleList()
        for t in range(kD):
            conv2d = nn.Conv2d(
                conv3d.in_channels, conv3d.out_channels,
                kernel_size=kernel_2d, stride=stride_2d, bias=False,
            )
            # 从 3D 权重中切出第 t 个 2D 切片
            conv2d.weight.data.copy_(kernel_3d[:, :, t, :, :])
            self.convs.append(conv2d)

        self.bias = conv3d.bias  # bias 只加一次

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (B, C, D, H, W) → 沿 D 拆分为 D 个 (B, C, H, W)
        slices = x.split(1, dim=self.split_dim)
        result = None
        for t, conv in enumerate(self.convs):
            out = conv(slices[t].squeeze(self.split_dim))
            result = out if result is None else result + out
        if self.bias is not None:
            result = result + self.bias.view(1, -1, 1, 1)
        return result
数学等价性证明

设 Conv3d 权重为 WRCout×Cin×T×H×W,输入为 XRB×Cin×T×H×W

原始 Conv3d 输出:Y=t=0T1W[:,:,t,:,:]X[:,:,t,:,:]

分解后:每个 Conv2d 计算 Yt=W[:,:,t,:,:]X[:,:,t,:,:],最终 Y=tYt

两者完全等价,无精度损失。

1.3 Attention 机制适配

HuggingFace 默认使用 Flash Attention(需要 CUDA),NPU 不支持。需要改写为 Eager Attention(朴素矩阵乘法实现):

python
import torch.nn.functional as F


class NPUAttention(nn.Module):
    """NPU 友好的 Eager Attention 实现。

    与 Flash Attention 数学等价,但使用标准 matmul + softmax,
    可被所有 NPU 后端执行。
    """

    def __init__(self, hidden_size: int, num_heads: int, head_dim: int):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = head_dim
        self.scaling = head_dim ** -0.5
        self.qkv_proj = nn.Linear(hidden_size, 3 * num_heads * head_dim)
        self.out_proj = nn.Linear(num_heads * head_dim, hidden_size)

    def forward(
        self, x: torch.Tensor,
        rope_cos: torch.Tensor = None,
        rope_sin: torch.Tensor = None,
    ) -> torch.Tensor:
        B, S, _ = x.shape

        # QKV 投影
        qkv = self.qkv_proj(x).reshape(B, S, 3, self.num_heads, self.head_dim)
        q, k, v = qkv.permute(2, 0, 3, 1, 4).unbind(0)
        # q/k/v: (B, num_heads, S, head_dim)

        # 应用 RoPE(预计算的 cos/sin 作为输入,避免 NPU 上做三角运算)
        if rope_cos is not None and rope_sin is not None:
            q = q * rope_cos + _rotate_half(q) * rope_sin
            k = k * rope_cos + _rotate_half(k) * rope_sin

        # 标准矩阵乘法 Attention(替代 Flash Attention)
        attn_weights = torch.matmul(q, k.transpose(-2, -1)) * self.scaling
        attn_weights = F.softmax(attn_weights, dim=-1, dtype=torch.float32).to(q.dtype)
        attn_output = torch.matmul(attn_weights, v)

        # (B, num_heads, S, head_dim) → (B, S, hidden)
        attn_output = attn_output.transpose(1, 2).reshape(B, S, -1)
        return self.out_proj(attn_output)


def _rotate_half(x: torch.Tensor) -> torch.Tensor:
    """RoPE 的旋转辅助函数"""
    x1, x2 = x.chunk(2, dim=-1)
    return torch.cat((-x2, x1), dim=-1)

关键改动

  • Flash Attention → 标准 matmul + softmax + matmul(NPU 可执行)
  • RoPE 从动态计算改为预计算 cos/sin 作为输入,避免在 NPU 上做复杂的三角函数运算
  • 所有 tensor 操作保持 NPU 友好(无 in-place、无动态 shape 分支)

1.4 端侧模型的通用适配要点

除了上述核心算子替换,端侧部署还需要关注以下适配:

python
# 端侧 LLM Decoder 的通用适配配置
adaptation_config = {
    # Attention 相关
    'attn_implementation': 'eager',         # 强制 eager attention
    'return_new_kv_only': True,             # KV Cache 只输出增量,由外部管理拼接
    'transposed_key_cache': True,           # Key 转置存储 (head, dim, seq),加速 matmul
    'precomputed_rope': True,               # RoPE cos/sin 作为外部输入

    # 输入相关
    'fixed_input_shape': True,              # 固定输入 shape(NPU 编译期需要确定)
    'precomputed_attention_mask': True,     # 注意力掩码预计算后传入

    # 模型拆分
    'separate_embedding': True,             # Embedding 层分离(可在 CPU 执行)
    'separate_lm_head': True,               # LM Head 分离(可单独量化)
    'separate_vision_encoder': True,        # Vision Encoder 分离(独立量化方案)
}

端侧固定尺寸策略

云端推理可以处理动态分辨率的图像和变长序列,但端侧模型通常要求固定输入尺寸,原因:

  1. NPU 编译时需要确定 tensor shape 以进行硬件指令调度优化
  2. 动态 shape 会导致频繁重编译,引入不可接受的延迟
  3. KV Cache 大小需要预分配,固定尺寸简化内存管理

Step 2: 图优化(Graph Optimization)

适配后的模型需要进行图级优化,为量化做准备。这一步在不同 SDK 中有不同工具,但核心工作是相同的:

python
import torch.onnx

# Step 2.1: 导出为 ONNX 中间表示
dummy_input = torch.randn(1, 128, hidden_size)

torch.onnx.export(
    adapted_model,
    dummy_input,
    "model.onnx",
    opset_version=17,
    input_names=['input_ids_embed'],
    output_names=['logits'],
    dynamic_axes=None,  # 端侧不使用动态 axes
)

# Step 2.2: 使用 ONNX 的图优化 pass
import onnx
from onnxruntime.transformers import optimizer

optimized_model = optimizer.optimize_model(
    "model.onnx",
    model_type='bert',  # 选择合适的优化模板
    opt_level=2,        # 算子融合 + 常量折叠
)
optimized_model.save_model_to_file("model_optimized.onnx")

图优化的核心工作:

  1. 算子融合:将 MatMul + Add → FusedLinear,LayerNorm 的多步运算 → 单算子
  2. 常量折叠:预计算所有编译期可确定的值
  3. 冗余消除:删除无用的 Reshape、Transpose 对
  4. Softmax 保留:确保 Softmax 不被折叠到其他算子中(量化需要保留独立节点)

图优化注意事项

某些优化 pass 可能破坏后续量化的精度。例如公共子表达式消除(CSE)可能合并本应独立量化的分支。实践中需要根据量化结果有选择地跳过部分优化。

Step 3: 量化(PTQ)

这是端侧部署中对精度影响最大的一步。端侧量化通常使用训练后量化(PTQ),核心流程如下:

3.1 量化仿真

量化仿真(Quantization Simulation)在 PyTorch 中模拟量化效果,无需真实硬件:

python
import torch
from torch.ao.quantization import (
    get_default_qconfig_mapping,
    prepare_fx,
    convert_fx,
)

# 创建量化配置:权重 INT8 对称,激活 INT16 非对称
from torch.ao.quantization import QConfig, MinMaxObserver, PerChannelMinMaxObserver

qconfig_w8a16 = QConfig(
    activation=MinMaxObserver.with_args(
        dtype=torch.qint16, qscheme=torch.per_tensor_affine
    ),
    weight=PerChannelMinMaxObserver.with_args(
        dtype=torch.qint8, qscheme=torch.per_channel_symmetric
    ),
)

# 插入观测器(Observer)
prepared_model = prepare_fx(
    optimized_model,
    qconfig_mapping={"": qconfig_w8a16},
    example_inputs=(dummy_input,),
)
Vision Encoder vs LLM Decoder 的量化差异
配置项Vision EncoderLLM Decoder
权重位宽8-bit8-bit 或 4-bit
激活位宽16-bit16-bit
SeqMSE可选推荐(对精度影响显著)
LPBQ不适用可选(对输出投影层有效)
Embedding Table-INT8 单独处理

Vision Encoder 对量化更敏感,所以使用 W8A16(权重 8-bit,激活 16-bit)方案。LLM Decoder 参数量大,可以更激进地使用 W4A16。

3.2 校准与计算量化参数

python
def calibrate(model: nn.Module, calibration_loader, num_batches: int = 100):
    """用真实数据跑 forward pass,统计每层的激活值分布"""
    model.eval()
    with torch.no_grad():
        for idx, batch in enumerate(calibration_loader):
            if idx >= num_batches:
                break
            model(batch['input_ids_embed'].cuda())

# 执行校准:Observer 自动统计 min/max
calibrate(prepared_model, train_loader, num_batches=100)

# 将观测到的统计量转换为量化参数 (scale, zero_point)
quantized_model = convert_fx(prepared_model)

3.3 SQNR 逐阶段精度验证

每一步适配/优化/量化后,都通过 SQNR(Signal-to-Quantization-Noise Ratio)验证精度:

python
def compute_sqnr(signal: torch.Tensor, quantized: torch.Tensor) -> float:
    """计算信号与量化噪声之比(dB)。值越高表示量化损失越小。"""
    noise = signal - quantized
    signal_power = (signal ** 2).mean()
    noise_power = (noise ** 2).mean()
    if noise_power == 0:
        return float('inf')
    return 10 * torch.log10(signal_power / noise_power).item()

# 阶段 1: 适配后 vs 原始 FP32
adapted_sqnr = compute_sqnr(fp_output, adapted_output)
print(f'Adapted vs FP32 SQNR: {adapted_sqnr:.1f} dB')  # 期望 > 60 dB

# 阶段 2: 图优化后 vs FP32
optimized_sqnr = compute_sqnr(fp_output, optimized_output)
print(f'Optimized vs FP32 SQNR: {optimized_sqnr:.1f} dB')  # 期望 > 55 dB

# 阶段 3: 量化后 vs 图优化后
quant_sqnr = compute_sqnr(optimized_output, quantized_output)
print(f'Quantized vs Optimized SQNR: {quant_sqnr:.1f} dB')  # 期望 > 30 dB

SQNR 红线

  • SQNR > 35 dB:量化损失可以接受
  • SQNR 25-35 dB:需要仔细评估任务精度
  • SQNR < 25 dB:量化方案需要调整(降低量化比特、使用 SeqMSE、保留敏感层为高精度)

3.4 高级量化技术:SeqMSE 与 LPBQ

标准 PTQ 使用 MinMax 校准,容易受离群值影响。以下两种公开研究的技术可以显著提升量化精度:

SeqMSE vs 普通 PTQ 的工作原理

普通 PTQ(MinMax)

  1. 跑校准数据,记录每个 tensor 的 min/max
  2. scale = (max - min) / (2^bits - 1)offset = min / scale
  3. 问题:离群值会拉大 range,导致大部分值的量化精度下降

SeqMSE(Sequential MSE Minimization)

  1. 逐层处理,冻结其他层
  2. 对当前层尝试多个候选 (scale, offset) 组合
  3. 选择使 MSE=yfpyquant2 最小的那组参数
  4. 计算量更大(每层需要多次 forward),但精度显著提升
  5. 特别适合 INT4 这种极低比特量化

LPBQ(Low-Precision Block Quantization)

  1. 将权重矩阵划分为小块(如 128 元素一组)
  2. 每个块独立计算量化参数
  3. 块粒度比 per-channel 更细,能更好地适应权重分布不均匀的情况
  4. 对 LM Head 等大型投影层效果尤其好
python
# SeqMSE 的伪代码实现思路
def sequential_mse_calibration(model, layer, calibration_data, num_candidates=20):
    """逐层搜索最优量化参数"""
    best_scale, best_zp, best_mse = None, None, float('inf')

    # 获取当前层的 FP32 参考输出
    fp_output = run_layer_fp32(layer, calibration_data)

    # 从观测到的 min/max 中生成多个候选 scale
    observed_min, observed_max = observe_range(layer, calibration_data)
    candidates = generate_scale_candidates(observed_min, observed_max, num_candidates)

    for scale, zero_point in candidates:
        # 用候选参数做量化 forward
        quant_output = run_layer_quantized(layer, calibration_data, scale, zero_point)
        mse = ((fp_output - quant_output) ** 2).mean()
        if mse < best_mse:
            best_scale, best_zp, best_mse = scale, zero_point, mse

    return best_scale, best_zp

Step 4: 中间格式导出与精度验证

量化完成后,需要导出为目标 SDK 可消费的中间格式,并生成 Test Vectors 用于设备端精度比对:

python
# 1. 导出量化后的 ONNX 模型
torch.onnx.export(
    quantized_model,
    dummy_input,
    "model_quantized.onnx",
    opset_version=17,
    input_names=['input_ids_embed'],
    output_names=['logits'],
)

# 2. 导出量化编码(每个 tensor 的 scale 和 zero_point)
def export_encodings(model, path: str):
    """将量化参数保存为 JSON,供 NPU 编译器使用"""
    encodings = {}
    for name, module in model.named_modules():
        if hasattr(module, 'scale') and hasattr(module, 'zero_point'):
            encodings[name] = {
                'scale': module.scale.tolist(),
                'zero_point': module.zero_point.tolist(),
                'bitwidth': module.bitwidth,
            }
    import json
    with open(path, 'w') as f:
        json.dump(encodings, f, indent=2)

export_encodings(quantized_model, "model_quantized.encodings")

# 3. 生成 Test Vectors(用于设备端精度对齐)
def generate_test_vectors(model, inputs, output_dir: str):
    """保存参考输入输出,部署后可比对精度"""
    import os, numpy as np
    os.makedirs(output_dir, exist_ok=True)
    with torch.no_grad():
        outputs = model(*inputs)
    for i, inp in enumerate(inputs):
        np.save(f"{output_dir}/input_{i}.npy", inp.cpu().numpy())
    np.save(f"{output_dir}/output.npy", outputs.cpu().numpy())

generate_test_vectors(quantized_model, (dummy_input,), "test_vectors/")

导出产物:

  • model_quantized.onnx:带量化信息的模型结构和权重
  • model_quantized.encodings:每个 tensor 的量化参数(scale、zero_point、bitwidth)
  • test_vectors/:输入输出的参考值,用于设备端比对

Step 5: NPU 编译流水线

这是将中间格式转换为设备可执行格式的关键步骤。虽然不同 SDK 的具体命令不同,但编译流水线的结构是通用的:

5.1 模型分割

端侧内存有限,需要将大模型拆分为多个子图:

python
import onnx

def split_onnx_model(model_path: str, split_points: list, output_dir: str):
    """将 ONNX 模型按指定节点拆分为多个子图。

    典型的拆分策略:
      - Part 1: Embedding 层(可在 CPU 执行)
      - Part 2: Decoder Layers(主体,在 NPU 执行)
      - Part 3: LM Head(单独量化方案)
    """
    model = onnx.load(model_path)
    # ... 根据 split_points 的节点名称切分图
    # 每个子图独立保存,包含各自的输入输出定义
    for i, subgraph in enumerate(subgraphs):
        onnx.save(subgraph, f"{output_dir}/part_{i+1}_of_{len(subgraphs)}.onnx")

5.2 注意力结构优化

NPU 通常对单头注意力(SHA)的执行效率高于多头注意力(MHA)。编译阶段可以将 MHA reshape 为多个独立的 SHA 调用:

MHA (Multi-Head Attention):
  Q: (B, num_heads, S, head_dim) → 一次大矩阵乘法

SHA (Single-Head Attention):
  for h in range(num_heads):
      Q_h: (B, 1, S, head_dim) → 多次小矩阵乘法

NPU 优势:小矩阵乘法可以更好地利用片上缓存(SRAM/VTCM),
减少对外部 DRAM 的访问。

5.3 通用编译流程

不同 SDK 的编译流程虽然命令不同,但结构相似:

bash
# === 通用 NPU 编译流水线(伪命令) ===

# Step 1: 中间格式转换(ONNX → SDK 特有格式)
npu-converter \
    --input model_quantized.onnx \
    --quantization_overrides model_quantized.encodings \
    --output model.intermediate

# Step 2: 量化编译(应用量化参数到硬件指令)
npu-quantize-compile \
    --input model.intermediate \
    --activation_bitwidth 16 \
    --weight_bitwidth 8 \
    --calibration_data input_list.txt \
    --output model.compiled

# Step 3: 生成设备可执行二进制
npu-binary-generator \
    --backend npu_accelerator_lib \
    --compiled_model model.compiled \
    --optimization_level 3 \
    --output model.device_binary
各 SDK 的实际编译命令对比
步骤QNN SDKCore MLTensorRT
格式转换qairt-convertercoremltools.convert()trtexec --onnx
量化编译qairt-quantizer内置于 converttrtexec --int8
二进制生成qnn-context-binary-generator.mlmodelc bundle.engine file
最终产物.serialized.bin.mlmodelc.engine

5.4 Multi-AR Weight Sharing

LLM 推理的 Prefill 和 Decode 阶段输入 shape 不同,但模型权重相同。通过 Weight Sharing,两个计算图共享同一份权重,内存占用减半:

Prefill 图(AR=128):每次处理 128 个 token
  输入: (1, 128, hidden_size) → 输出: logits + KV Cache

Decode 图(AR=1 或 AR=32):每次处理 1~32 个 token
  输入: (1, 1, hidden_size) + KV Cache → 输出: logits + 新 KV

两图的权重完全相同 → 编译为共享权重的二进制包 → 内存减半

多模态模型的端侧架构

一个典型的多模态 LLM 在端侧的推理架构:

┌───────────────────────────────────────────────────┐
│                    Host CPU                       │
│  ┌──────────┐  ┌───────────┐  ┌──────────────┐  │
│  │Tokenizer │  │ Processor │  │ KV Cache Mgr │  │
│  │          │  │(Image Pre)│  │ (Mem Alloc)  │  │
│  └────┬─────┘  └─────┬─────┘  └──────┬───────┘  │
│       │               │               │          │
│       ▼               ▼               ▼          │
│  ┌────────────────────────────────────────────┐  │
│  │             NPU (端侧加速器)                │  │
│  │  ┌──────────────────────────────────────┐  │  │
│  │  │  Vision Encoder (W8A16)              │  │  │
│  │  │  Conv2d Patch Embed + ViT Blocks     │  │  │
│  │  └──────────────┬───────────────────────┘  │  │
│  │                 │ vision_embedding          │  │
│  │                 ▼                           │  │
│  │  ┌──────────────────────────────────────┐  │  │
│  │  │  Embedding (INT8 LUT)                │  │  │
│  │  │  + Vision Token Concat               │  │  │
│  │  └──────────────┬───────────────────────┘  │  │
│  │                 ▼                           │  │
│  │  ┌──────────────────────────────────────┐  │  │
│  │  │  Decoder Layers (W8A16/W4A16)        │  │  │
│  │  │  N layers, Eager Attention + Conv2d  │  │  │
│  │  └──────────────┬───────────────────────┘  │  │
│  │                 ▼                           │  │
│  │  ┌──────────────────────────────────────┐  │  │
│  │  │  LM Head (独立量化)                   │  │  │
│  │  └──────────────┬───────────────────────┘  │  │
│  │                 │ logits                    │  │
│  └─────────────────┼──────────────────────────┘  │
│                    ▼                              │
│  ┌────────────────────────────────────┐          │
│  │  Sampling (Top-K / Top-P / Greedy) │          │
│  └────────────────────────────────────┘          │
└───────────────────────────────────────────────────┘

端侧 vs 云端推理对比

维度云端推理端侧推理
模型规模70B-400B0.5B-7B
精度FP16/BF16W4A16 / W8A16
吞吐量高(批处理)低(单请求)
延迟网络延迟 + 推理仅推理延迟
隐私数据上传云端数据本地处理
成本GPU 时间费用零边际成本
模型适配最小改动(PyTorch 原生)大量改动(Linear→Conv, Eager Attn)
硬件NVIDIA GPU (CUDA)NPU / ANE / CPU
并发能力高(Continuous Batching)低(单用户)

端云协同

实际产品中,端侧和云端往往不是二选一,而是协同工作:

  • 简单请求(短对话、快速问答)→ 端侧处理,零延迟
  • 复杂请求(长文档分析、复杂推理)→ 上传云端,大模型处理
  • 隐私敏感(医疗、金融)→ 强制端侧

实践要点总结

环节关键决策常见陷阱
模型适配Linear→Conv2d, Conv3d→Conv2d忘记替换某些子模块;下游工具无法识别替换后的层
Attention 改写Flash Attention → Eager忘记处理 GQA 的 repeat_kv;Softmax 数值不稳定
量化方案选择Vision W8A16, LLM W4A16Vision 用 W4 导致图像理解严重退化
SeqMSE对 decoder 层启用校准数据太少(< 50 batch)导致效果不稳定
SQNR 验证每步都验证只在最终验证,无法定位精度损失来源
模型分割按 Embedding / Decoder / LM Head分割点选错导致跨图 tensor 传输开销大
Weight SharingPrefill 和 Decode 图共享权重未启用 weight sharing,内存占用翻倍
序列长度固定 context_length(如 4096)设太长导致 KV Cache 超出设备内存

苏格拉底时刻

在继续之前,尝试回答以下问题来检验你的理解:

1. 为什么 KV Cache 能加速推理?如果没有 KV Cache,生成第 100 个 token 需要多少次矩阵乘法?

没有 KV Cache 时,生成第 n 个 token 需要对前面 n1 个 token 重新计算所有层的 K 和 V,总计算量与 n2 成正比。生成第 100 个 token 需要对 99 个 token 做完整的 Attention 计算。有了 KV Cache,只需要计算当前 token 的 Q、K、V 并拼接,计算量与 n 成线性关系。

2. PagedAttention 借鉴了操作系统的什么概念?为什么能提升显存利用率?

借鉴了虚拟内存中的分页机制(Paging)。通过将 KV Cache 切分为固定大小的 Block 并按需分配,避免了预分配最大长度导致的内部碎片,以及不同请求长度不一导致的外部碎片。显存利用率从 20-40% 提升到接近 100%。

3. Continuous Batching 相比 Static Batching 的核心改进是什么?

核心改进是在 iteration 级别而非 request 级别做调度。短请求完成后立即释放资源并插入新请求,避免 GPU 空等长请求完成。吞吐量提升 2-8 倍。

4. 投机采样为什么是「无损」的?如果小模型猜错了会怎样?

通过精心设计的 accept/reject 机制(以 min(1,p(x)/q(x)) 的概率接受),保证了最终输出的概率分布与大模型一致。小模型猜错时,从修正后的分布 max(0,p(x)q(x)) 重新采样,确保不引入偏差。最坏情况下退化为普通的大模型推理速度。

5. Chunked Prefill 的 Token Budget 是什么意思?为什么 Decode 优先?

Token Budget 是每步模型前向传播的总 token 数上限(如 8192)。Decode 优先是因为正在生成的请求需要持续输出 token 以满足用户实时体验——如果被新的 Prefill 阻塞,用户会感受到卡顿。所以先把所有 Decode 请求放入 batch(每个只占 1 token),再用剩余的 budget 给 Prefill 请求分 chunk。

6. PD 分离的核心挑战是什么?有几种 KV Cache 管理模式?

核心挑战是 Prefill 节点计算完 KV Cache 后需要高效传输到 Decode 节点,传输延迟不能抵消分离带来的收益。三种管理模式:(1) KV Cache 仅存于 Decode 节点 (2) PD 各存各的 (3) 中心化/去中心化 Cache 服务。

7. 在 PagedAttention 的 Decode 阶段,多个 Page 的 Attention 结果如何正确聚合?

通过 Online Softmax 技巧:每个 Page 独立计算局部的 Attention 输出 Oi、最大值 Mi 和归一化因子 Li,然后通过 O=iexp(MiMglobal)LiLglobalOi 聚合为全局正确的结果。这是精确计算,非近似。


常见问题 & 面试考点

高频面试题

Q: LLM 推理的 Prefill 和 Decode 阶段有什么区别?

A: Prefill 是处理输入 prompt,一次性计算所有 token 的 KV Cache,是 Compute-bound;Decode 是逐 token 生成,每步读取全部参数和 KV Cache,是 Memory-bound。以 Llama 70B 为例,Decode 阶段算术强度仅为 1 FLOP/Byte,远低于 GPU 的计算带宽比 156 FLOP/Byte。

Q: vLLM 为什么快?核心技术是什么?

A: vLLM 的核心是 PagedAttention(解决 KV Cache 显存碎片,利用率从 ~30% 提升到 ~100%)+ Continuous Batching(迭代级调度,吞吐提升 2-8x)+ Chunked Prefill(避免长 Prefill 阻塞 Decode)。

Q: 投机采样的加速比取决于什么因素?

A: 取决于 Draft Model 和 Target Model 输出分布的一致性(可用 KL 散度度量)。接受率越高加速越大。通常使用同系列的小模型(如 Llama-7B → Llama-70B)可以达到 70-85% 的接受率,实现 2-3x 加速。

Q: Chunked Prefill 和 PD 分离的关系是什么?

A: Chunked Prefill 是 PD 融合方案——在同一 GPU 上交错执行 Prefill 和 Decode。PD 分离则是将二者放在不同物理节点上。实际系统通常兼并两种方案:Prefill 节点内部也可以使用 Chunked Prefill。

性能指标速查

指标含义优化技术
TTFT (Time To First Token)首个 token 延迟Chunked Prefill, 模型并行
TPOT (Time Per Output Token)每个输出 token 延迟KV Cache, 投机采样
Throughput (tokens/s)吞吐量Continuous Batching, PagedAttention
GPU Memory Utilization显存利用率PagedAttention
GPU Compute Utilization算力利用率Chunked Prefill

推荐资源

必读论文

代码与工具

  • vLLM GitHub — 最流行的开源推理引擎
  • TensorRT-LLM — NVIDIA 官方推理优化
  • SGLang — 高性能推理框架
  • AngelSlim — 腾讯大模型压缩工具包(EAGLE-3 投机采样 + 量化 + SpecExit)
  • Qualcomm AI Hub — Qualcomm 端侧模型部署平台,提供预优化模型和部署工具
  • MLC LLM — 通用 LLM 端侧推理引擎(支持 iOS/Android/GPU/NPU)
  • llama.cpp — 纯 C/C++ 实现的 LLM 推理,极致兼容性

博客与视频