推理优化
一句话总结
推理优化让大模型从实验室走向生产环境——通过 KV Cache、PagedAttention、Continuous Batching、Chunked Prefill、投机采样、PD 分离等技术,将推理延迟从秒级降到毫秒级,将吞吐量提升数十倍;端侧推理引擎更将 LLM 能力延伸到手机和边缘设备。
在大模型体系中的位置
推理优化处于大模型工程化的最后一公里:
预训练 → SFT/RLHF → 量化/蒸馏 → 【推理优化】 → 上线服务
↑
你在这里无论模型训练得多好,如果推理效率低下,用户就会面临高延迟、高成本的困境。推理优化的核心目标是:
- 降低延迟(Latency):用户等待第一个 token 的时间(TTFT)和每个 token 的生成时间(TPOT)
- 提升吞吐(Throughput):单位时间内处理的 token 数量
- 降低成本(Cost):最大化 GPU 利用率,减少所需 GPU 数量
推理优化涉及的技术层次:
| 层次 | 技术 | 目标 |
|---|---|---|
| 算法层 | KV Cache、投机采样 | 减少冗余计算 |
| 内存层 | PagedAttention、KV Cache 管理 | 提升显存利用率 |
| 调度层 | Continuous Batching、Chunked Prefill | 提升 GPU 利用率 |
| 架构层 | PD 分离、vLLM | 系统级优化 |
| 内核层 | FlashAttention、融合算子 | 底层计算加速 |
LLM 推理的两个阶段
LLM 的自回归推理天然分为两个性质截然不同的阶段:
Prefill 阶段(计算密集)
Prefill 阶段处理用户输入的完整 prompt,一次性计算所有输入 token 的 KV Cache。
特征:
- 输入:完整 prompt(可能有数千个 token)
- 计算模式:大矩阵乘法,可以高度并行
- 瓶颈:计算密集(Compute-bound),GPU 算力是限制因素
- 类比:一次性读完一篇文章
Decode 阶段(访存密集)
Decode 阶段逐个生成输出 token,每一步都需要读取全部模型参数和之前所有 token 的 KV Cache。
特征:
- 输入:单个 token(上一步的输出)
- 计算模式:向量-矩阵乘法,计算量小但内存读取量大
- 瓶颈:访存密集(Memory-bound),显存带宽是限制因素
- 类比:一个字一个字地写回复,每写一个字都要回头看全部内容
用 Roofline Model 分析为什么 Decode 是 Memory-Bound
Roofline Model 的核心指标是算术强度(Arithmetic Intensity):
以 Llama 70B 为例,分析 Decode 阶段生成单个 token 的情况:
计算量估算:
- 模型参数量:70B(700 亿参数)
- 每个参数参与 2 次浮点运算(一次乘法 + 一次加法)
- FLOPs = 2 x 70B = 140 GFLOPs
访存量估算:
- FP16 精度下,每个参数占 2 Bytes
- 需要读取全部模型参数:70B x 2 = 140 GB
- 还需要读取 KV Cache(随序列长度增长)
算术强度:
而 A100 GPU 的计算/带宽比值:
结论:Decode 阶段的算术强度(1)远低于 GPU 的计算带宽比(156),这意味着 GPU 的计算单元大部分时间在等待数据从显存搬运过来,算力严重浪费。这就是为什么 Decode 阶段是 Memory-bound 的。
KV Cache
为什么需要 KV Cache
在自回归生成中,每个新 token 需要和之前所有 token 做 Attention 计算。如果每次都重新计算所有 token 的 K、V,就会产生大量冗余计算:
# 不使用 KV Cache(每次重新计算)—— O(n^2) 总计算量
for i in range(max_tokens):
logits = model(all_tokens[:i+1]) # 每次传入全部 token,重复计算前面的 K、V
next_token = sample(logits)
# 使用 KV Cache(增量计算)—— O(n) 总计算量
kv_cache = None
for i in range(max_tokens):
logits, kv_cache = model(tokens[i:i+1], past_kv=kv_cache) # 只传入新 token
next_token = sample(logits)没有 KV Cache 时,生成第
KV Cache 的显存计算
KV Cache 的显存公式:
其中:
| 符号 | 含义 | Llama 70B 取值 |
|---|---|---|
| K 和 V 两个矩阵 | 2 | |
| Transformer 层数 | 80 | |
| 注意力头数(KV头数,GQA) | 8(GQA) | |
| 每个头的维度 | 128 | |
| 序列长度 | 4096 | |
| Batch Size | 1 | |
| 数据类型字节数(FP16=2) | 2 |
Llama 70B 在 4K context、batch_size=1 下的 KV Cache:
如果使用完整的 64 个注意力头(非 GQA):
当 batch_size 增大到 32 时,KV Cache 直接膨胀到 ~32 GB,和模型参数本身(140 GB FP16)相比已经非常可观。这就是为什么 KV Cache 的管理如此重要。
PagedAttention
传统 KV Cache 的内存碎片问题
传统方式为每个请求预分配一块连续的、最大长度的 KV Cache 空间。这带来两个问题:
- 内部碎片(Internal Fragmentation):请求实际生成长度通常远小于预分配的最大长度,大量显存被浪费
- 外部碎片(External Fragmentation):不同请求占用的连续空间大小不一,释放后留下的空隙无法被其他请求利用
实测表明,传统方案的 KV Cache 显存利用率仅为 20-40%。
虚拟内存的启发
vLLM 团队从操作系统的虚拟内存管理中获得灵感:
| 操作系统概念 | PagedAttention 对应 |
|---|---|
| 虚拟页 | 逻辑块(Logical Block) |
| 物理页帧 | 物理块(Physical Block) |
| 页表 | Block Table |
| 按需分页 | 动态分配 KV Cache Block |
| Copy-on-Write | Beam Search 共享前缀 |
物理块与逻辑块
核心思想:将 KV Cache 切分为固定大小的 Block(如每个 Block 存储 16 个 token 的 KV),通过 Block Table 维护逻辑到物理的映射。
逻辑视图(请求看到的连续空间):
Request 1: [Block 0] [Block 1] [Block 2]
Request 2: [Block 0] [Block 1]
物理存储(实际的 GPU 显存):
[Page 5] [Page 2] [Page 7] [Page 1] [Page 3] ... [Page N]
Block Table(映射关系):
Request 1: 逻辑 0→物理 5, 逻辑 1→物理 7, 逻辑 2→物理 2
Request 2: 逻辑 0→物理 1, 逻辑 1→物理 3页分配器的实现:
from collections import deque
class PageAllocator:
"""物理页帧的分配与回收,维护空闲池和链式索引"""
def __init__(self, page_size: int, total_pages: int):
self.page_size = page_size
self.total_pages = total_pages
self.free_pool = deque(range(total_pages)) # 用 deque 实现 O(1) 双端操作
self.in_use = set()
self.slot_offset = [0] * total_pages # 每页已填充的 token 数
self.successor = [-1] * total_pages # 页链表:当前页 → 下一页
def acquire(self, count: int, prev_page: int = -1) -> list[int]:
"""从空闲池取出 count 个页帧,可选地链接到 prev_page 之后"""
if len(self.free_pool) < count:
return []
grabbed = [self.free_pool.popleft() for _ in range(count)]
self.in_use.update(grabbed)
for pid in grabbed:
self.slot_offset[pid] = 0
self.successor[pid] = -1
if prev_page >= 0:
self.successor[prev_page] = grabbed[0]
return grabbed
def release(self, page_ids: list[int]):
"""将页帧归还空闲池,重置元数据"""
for pid in page_ids:
if pid in self.in_use:
self.in_use.discard(pid)
self.free_pool.append(pid)
self.slot_offset[pid] = 0
self.successor[pid] = -1PagedAttention 内核实现
PagedAttention 的关键在于:KV Cache 在物理上是非连续的,但 Attention 计算需要正确地跨 Block 聚合结果。
传统 Attention 的输入形状是 [batch_size, seq_len, num_heads, head_dim],而 PagedAttention 的 KV Cache 输入是 [num_pages, page_size, num_heads, head_dim]——按页组织。
Prefill 阶段的 PageAttention:
多个请求的 Block 混合排列,通过 request_num_pages 记录每个请求占用的页数:
class PageAttentionDecoderBlock(nn.Module):
def __init__(self, config):
super().__init__()
self.num_heads = config.num_heads
self.dim = config.dim
self.head_dim = config.head_dim
self.WQ = nn.Linear(config.dim, config.dim, bias=False)
self.WK = nn.Linear(config.dim, config.dim, bias=False)
self.WV = nn.Linear(config.dim, config.dim, bias=False)
self.WO = nn.Linear(config.dim, config.dim, bias=False)
def forward_prefill(self, X, request_num_pages, attention_backend):
"""
Request-Wise PageAttention Prefill
X: [num_pages, page_size, dim] -- 注意 dim=0 不是 batch_size
request_num_pages: list[int], 如 [3, 2] 表示 2 个请求分别占 3 和 2 页
"""
B, T, _ = X.shape
H, D = self.num_heads, self.head_dim
Q, K, V = self.WQ(X), self.WK(X), self.WV(X)
Q = Q.reshape(B, T, H, D).transpose(1, 2)
K = K.reshape(B, T, H, D).transpose(1, 2)
V = V.reshape(B, T, H, D).transpose(1, 2)
O = torch.zeros_like(Q)
# 按请求分别计算 Attention
offset = 0
for t, N in enumerate(request_num_pages):
Q_ = Q[offset: offset + N]
K_ = K[offset: offset + N]
V_ = V[offset: offset + N]
O[offset: offset + N] = attention_backend(Q_, K_, V_)
offset += N
O = O.transpose(1, 2).reshape(B, T, H * D)
O = self.WO(O)
return O, [K.transpose(1, 2), V.transpose(1, 2)]Decode 阶段的 PageAttention(核心难点):
Decode 时每个请求只有 1 个新 query token,但需要和分散在多个 Page 上的 KV Cache 做 Attention。核心思路是:
- 将 query 复制到对应的每个 KV Page 上(dispatch)
- 在每个 Page 上独立计算局部 Attention
- 通过 Online Softmax 聚合所有 Page 的结果(reduce/combine)
# Decode 阶段:单个 query 对 N 页 KV Cache 的聚合
q = torch.randn(1, 1, d) # 1 个 query token
K = torch.randn(num_pages, page_size, d) # KV Cache 分散在多个页上
V = torch.randn(num_pages, page_size, d)
# 每页独立计算局部 Attention
S = q @ K.transpose(1, 2) # [num_pages, 1, page_size]
M, _ = torch.max(S, dim=-1, keepdim=True) # 局部最大值
L = torch.sum(torch.exp(S - M), dim=-1, keepdim=True) # 局部归一化因子
P = torch.exp(S - M) / L
O = P @ V # 局部输出
# 通过 Online Softmax 聚合多页结果
def combine_result(O, L, M):
"""将多页的局部 Attention 结果聚合为全局正确结果"""
M_new, _ = torch.max(M, dim=0, keepdim=True) # 全局最大值
L_new = torch.exp(M - M_new) * L
L_new = torch.sum(L_new, dim=0, keepdim=True) # 全局归一化因子
O_new = torch.exp(M - M_new) * (L / L_new) * O # 重新缩放
return O_new.sum(dim=0, keepdim=True)
O_final = combine_result(O, L, M)这个 Online Softmax 聚合的数学保证是:全局 Softmax 可以分解为局部 Softmax 的加权组合,通过维护全局最大值
Copy-on-Write 优化(Beam Search 场景)
在 Beam Search 中,多个候选序列共享相同的前缀。传统方式需要为每个候选复制一份完整的 KV Cache,而 PagedAttention 的 Copy-on-Write 机制允许:
- 多个候选序列共享前缀 Block,引用计数 > 1
- 只有当某个候选需要修改某个 Block 时,才复制该 Block
- 显存节省高达 55%(vLLM 论文数据)
Continuous Batching
Static Batching 的低效
传统的 Static Batching 将一批请求打包,等所有请求都生成完毕后才处理下一批:
时间 →
请求 A: [====生成 10 tokens====][等待...........][等待...........]
请求 B: [====生成 10 tokens====][====生成 30 tokens====][等待...........]
请求 C: [====生成 10 tokens====][====生成 30 tokens====][====生成 50 tokens====]
↑
A 和 B 早已完成,GPU 在空转问题:短请求完成后 GPU 在"陪跑"长请求,资源严重浪费。
Continuous Batching 的调度策略
Continuous Batching(也叫 Iteration-level Scheduling)在每次生成一个 token 后重新调度:
时间 →
Step 1: [A][B][C] ← 3 个请求并行生成
Step 2: [A][B][C]
...
Step 10: [A完成][B][C] ← A 完成,立即释放位置
Step 11: [D加入][B][C] ← 新请求 D 立即插入
...
Step 30: [D][B完成][C] ← B 完成
Step 31: [D][E加入][C] ← 新请求 E 插入核心实现涉及三个队列的管理:
class RequestManager:
def __init__(self, max_batch_size: int):
self.requests = {} # request_id -> Request
self.waiting_queue = deque() # 等待处理的请求
self.running_set = set() # 正在生成的请求
self.max_batch_size = max_batch_size
def add_request(self, prompt, max_seq_len):
"""新请求进入 waiting 队列"""
request_id = len(self.requests)
request = Request(request_id, prompt, max_seq_len)
self.requests[request_id] = request
self.waiting_queue.append(request_id)
return request_id
def get_pending_requests(self, max_count):
"""从 waiting 队列取出可调度的请求"""
available_slots = self.max_batch_size - len(self.running_set)
count = min(available_slots, max_count, len(self.waiting_queue))
pending = []
for _ in range(count):
req_id = self.waiting_queue.popleft()
self.running_set.add(req_id)
pending.append((req_id, self.requests[req_id].prompt))
return pending
def update_request(self, request_id, next_token):
"""更新请求状态,完成的请求立即释放"""
request = self.requests[request_id]
request.add_token(next_token)
if request.is_finished():
self.running_set.discard(request_id)
# 槽位释放,下一步可以插入新请求KV Cache 管理(槽位式):
class KVCacheManager:
def __init__(self, config):
self.kv_cache = torch.zeros(
2, config.num_layers, config.max_batch_size,
config.max_seq_len, config.num_heads, config.head_dim
)
self.slot_to_request = {}
self.request_to_slot = {}
self.available_slots = set(range(config.max_batch_size))
def allocate_slots(self, request_ids):
"""为新请求分配 KV Cache 槽位"""
allocated = []
for req_id in request_ids:
slot = self.available_slots.pop()
self.slot_to_request[slot] = req_id
self.request_to_slot[req_id] = slot
allocated.append(slot)
return allocated
def free_slot(self, request_id):
"""释放已完成请求的槽位"""
slot = self.request_to_slot.pop(request_id)
del self.slot_to_request[slot]
self.kv_cache[:, :, slot, :, :, :] = 0 # 清空 KV Cache
self.available_slots.add(slot)Continuous Batching 将吞吐量提升 2-8 倍,是现代推理引擎的标配。
Chunked Prefill
长 Prefill 阻塞 Decode 的问题
当系统同时服务多个用户时,新到达的长 prompt(如 4K token)的 Prefill 会长时间独占 GPU,导致已经在 Decode 阶段的请求被阻塞,用户感受到明显的卡顿(TPOT 突增)。
不使用 Chunked Prefill:
[Decode R1][Decode R2][========= Prefill R3 (4K tokens) =========][Decode R1][Decode R2]
↑ R1、R2 的 Decode 被阻塞了很久
使用 Chunked Prefill:
[Decode R1][Decode R2][Prefill R3 chunk1][Decode R1][Decode R2][Prefill R3 chunk2][Decode R1]...
↑ R1、R2 的 Decode 不被打断将 Prefill 拆分为 Chunks
核心思想:将长 prompt 按固定大小(如 512 token)切分为多个 chunk,每个 chunk 和当前的 Decode 请求合并成一个 batch 执行。
Chunked Prefill 的注意力计算需要处理跨 chunk 的 KV Cache 拼接:
class ChunkedPrefillAttention(nn.Module):
def forward(self, X, KVCache):
Q = self.WQ(X)
K = self.WK(X)
V = self.WV(X)
if KVCache is not None:
K_cache, V_cache = KVCache
K_ = torch.cat([K_cache, K], dim=1) # 拼接历史 KV 和当前 chunk
V_ = torch.cat([V_cache, V], dim=1)
else:
K_, V_ = K, V
# 标准 Attention 计算
S = Q @ K_.transpose(-2, -1) / math.sqrt(self.head_dim)
P = F.softmax(S, dim=-1)
O = P @ V_
return O, (K_, V_) # 返回更新后的 KV CacheChunked Prefill 的调度逻辑:
def chunk_prefill_method(model, x, page_size, dim):
"""将长 prompt 分 chunk 处理"""
bsz, seq_len = x.shape
num_chunks = (seq_len + page_size - 1) // page_size
KVCache = None
for i in range(num_chunks):
start = i * page_size
end = min((i + 1) * page_size, seq_len)
chunk = x[:, start:end]
logits, KVCache = model(chunk, KVCache=KVCache)
if i == num_chunks - 1: # 最后一个 chunk
last_token_logits = logits[:, -1, :]
else:
last_token_logits = None # 非最后一个 chunk 的 logits 无效
return last_token_logits, KVCache混合 PD 请求的 Chunked Prefill 调度器(vLLM V1 核心):
def plan_next_batch(self, max_batch_tokens, max_decoding_batch, max_prefill_batch):
"""Decode 优先,Prefill 填充剩余 token budget"""
plan = BatchPlan()
budget_used = 0
# 1. Decoding 请求优先并入 batch
for req_id in self.running_requests:
req = self.requests[req_id]
if req.is_decoding():
plan.ids.append(req_id)
plan.token_segments.append([req.generated_tokens[-1]])
plan.chunk_len.append(1)
budget_used += 1
if budget_used == max_decoding_batch:
break
# 2. Prefill 请求填充剩余 token budget
for req_id in self.running_requests:
req = self.requests[req_id]
if req.is_decoding():
continue
remaining_budget = max_batch_tokens - budget_used
pending = req.prompt[req.kv_len:]
if len(pending) <= remaining_budget:
# 可以一次性 prefill 完
plan.token_segments.append(pending)
budget_used += len(pending)
else:
# 需要 chunked prefill
plan.token_segments.append(pending[:remaining_budget])
budget_used += remaining_budget
return plan关键设计原则:
- Decode 优先:正在生成的请求不能被打断
- Token Budget:每步总 token 数有上限(如
max_batch_tokens=8192) - Chunk 大小可变:不是固定的,而是用剩余 budget 填满
投机采样 (Speculative Decoding)
核心思想:小模型草稿 + 大模型验证
标准的自回归解码每步只能生成 1 个 token,而每步都需要完整的前向传播。投机采样的核心观察是:
Decode 阶段是 Memory-bound,增加少量计算(验证多个 token)几乎不增加时间。
因此可以用一个小模型(Draft Model,如 7B)快速猜测
class SPDecoding:
def __init__(self, model_target, model_draft, spec_n):
self.model_target = model_target
self.model_draft = model_draft
self.spec_n = spec_n # 每次猜测的 token 数,如 5
def generate_draft(self, spec_n, x):
"""小模型自回归生成 spec_n 个候选 token"""
logits_y = []
for i in range(spec_n):
with torch.no_grad():
logits = self.model_draft(x)[:, [-1], :]
logits_y.append(logits)
next_token = torch.argmax(logits, dim=-1)
x = torch.cat([x, next_token], dim=1)
return x, torch.cat(logits_y, dim=1)
def generate(self, x, max_new_tokens=30):
count = 0
y_new = []
for i in range(max_new_tokens):
# Step 1: 小模型猜测
x_spec, logits_draft = self.generate_draft(self.spec_n, x)
y_spec = x_spec[:, -self.spec_n:]
# Step 2: 大模型一次性验证
logits_target = self.model_target(x_spec)[:, -self.spec_n-1:]
y_target = torch.argmax(logits_target, dim=-1)
# Step 3: 从左到右逐个验证
y_target_verify = y_target[:, :-1]
verify = y_spec == y_target_verify
idx1, idx2 = torch.where(verify == False)
if len(idx2) == 0:
accept_len = self.spec_n # 全部接受
else:
accept_len = idx2[0] # 部分接受
# Step 4: 更新输入
x = torch.cat((x, y_target[:, :accept_len+1]), dim=1)
y_new.append(y_target[:, :accept_len+1])
count += (accept_len + 1)
if count >= max_new_tokens - 1:
break
return torch.cat(y_new, dim=1)验证算法的数学保证(为什么输出分布不变)
上面的 Greedy 版本容易理解但实际中多用采样模式。Speculative Sampling 的关键在于验证步骤的概率保证:
对于草稿模型的概率分布
- 以概率
接受草稿 token - 如果拒绝,从修正分布
重新采样
class SPSamplingDecoding:
def generate(self, x, max_new_tokens=30):
count = 0
y_new = []
for i in range(max_new_tokens):
x_spec, logits_draft = self.generate_draft(self.spec_n, x)
y_spec = x_spec[:, -self.spec_n:]
logits_target = self.model_target(x_spec)[:, -self.spec_n-1:]
accept_len = 0
next_tokens = []
for j in range(self.spec_n):
r = torch.rand(1).item() # 均匀分布随机数
token_id = y_spec[0, j]
q = F.softmax(logits_target[0, j], dim=-1)
p = F.softmax(logits_draft[0, j], dim=-1)
# 核心验证:以 min(1, q/p) 的概率接受
if r < min(1, q[token_id] / p[token_id]):
accept_len += 1
next_tokens.append(y_spec[0, j])
else:
# 拒绝:从修正分布重新采样
q_ = q.clone()
idx = torch.where(q < p)
q_[idx] = p[idx] # max(0, p-q) 的近似
next_token = torch.multinomial(q_, num_samples=1)
next_tokens.append(next_token)
break
if accept_len == self.spec_n:
# 全部接受,bonus:大模型额外输出 1 个 token
next_tokens.append(y_target[0, accept_len])
x = torch.cat((x, torch.tensor([next_tokens])), dim=-1)
y_new.append(torch.tensor([next_tokens]))
count += len(next_tokens)
if count >= max_new_tokens - 1:
break
return torch.cat(y_new, dim=1)接受率分析
- 最好情况:Draft Model 和 Target Model 分布完全一致 → 全部
个 token 被接受,加速 倍 - 最差情况:分布完全不同 → 每次只接受 0 个,退化为标准解码(还多了 Draft 的开销)
- 实际情况:使用同系列的小模型(如 Llama-7B 作为 Llama-70B 的 Draft),接受率通常在 70-85%,加速 2-3 倍
KL 散度可以量化两个模型输出分布的差异:
def KL(logits_p, logits_q):
log_p = F.log_softmax(logits_p, dim=-1)
q = F.softmax(logits_q, dim=-1)
kl = F.kl_div(log_p, q, reduction='sum', log_target=False)
return kl
# KL 越小,接受率越高
# target vs draft (相近模型): KL ≈ 14.7
# target vs random: KL ≈ 110.4EAGLE-3:基于特征层的树状投机采样
上面的基础投机采样有两个关键局限:
- Draft Model 是独立的小模型,与 Target Model 的分布差异大,接受率有上限
- 线性草稿生成,每步只猜一条路径
EAGLE-3(腾讯 AngelSlim 开源)的核心创新是:Draft Model 不再独立,而是直接利用 Target Model 的隐藏状态,并以树状结构并行生成多条候选路径。
Draft Model 架构
EAGLE-3 的 Draft Model 非常轻量(Target Model 参数的 1/8 ~ 1/4),其核心是接收 Target Model 中间层的隐藏状态:
Target Model (70B) Draft Model (Light)
┌─────────────┐ ┌───────────────┐
│ Layer 0-31 │──hidden_states──→│ Combine Layer │
│ Layer 32 │ │ (Linear Proj) │
│ ... │ ├───────────────┤
│ Layer N │ │ 2-4 Attn Layers│
└─────────────┘ │ (Shared Embed) │
├───────────────┤
│ LM Head │
│ (Vocab Pruned) │
└───────────────┘class Eagle3DraftModel(nn.Module):
"""EAGLE-3 Draft Model 简化实现"""
def __init__(self, target_hidden_size, draft_hidden_size,
num_layers=2, num_heads=8, draft_vocab_size=8000):
super().__init__()
# 从 Target Model 隐藏状态投影到 Draft 空间
self.hidden_combine = nn.Linear(target_hidden_size, draft_hidden_size)
# 轻量 Transformer 层
self.layers = nn.ModuleList([
nn.TransformerDecoderLayer(
d_model=draft_hidden_size,
nhead=num_heads,
dim_feedforward=draft_hidden_size * 4,
batch_first=True
) for _ in range(num_layers)
])
# 裁剪后的词表头(只保留高频 token)
self.lm_head = nn.Linear(draft_hidden_size, draft_vocab_size, bias=False)
# 词表映射:draft_vocab → target_vocab
self.d2t = None # 在训练时从数据频率统计构建
self.t2d = None # target_vocab → draft_vocab (布尔映射)
def forward(self, target_hidden_states, input_ids, attention_mask=None):
# Step 1: 融合 Target Model 的隐藏状态
h = self.hidden_combine(target_hidden_states)
# Step 2: 加入 token embedding
embed = self.embed_tokens(input_ids)
h = h + embed
# Step 3: 通过轻量 Transformer 层
for layer in self.layers:
h = layer(h, memory=None, tgt_mask=attention_mask)
# Step 4: 输出裁剪词表上的 logits
logits = self.lm_head(h)
return logits词表裁剪 (Token Pruning)
EAGLE-3 的一个关键优化是词表裁剪:Draft Model 不需要预测全部 128K+ 的词表,只需覆盖最常见的 token:
def build_vocab_mapping(dataset, draft_vocab_size=8000, target_vocab_size=128256):
"""从训练数据的 token 频率构建词表映射"""
from collections import Counter
token_freq = Counter()
for sample in dataset:
token_freq.update(sample['input_ids'])
# 取 Top-N 高频 token
top_tokens = [tok for tok, _ in token_freq.most_common(draft_vocab_size)]
top_tokens.sort() # 保持有序
# 覆盖率统计
total = sum(token_freq.values())
covered = sum(token_freq[t] for t in top_tokens)
print(f"词表裁剪: {target_vocab_size} → {draft_vocab_size}")
print(f"Token 覆盖率: {covered/total:.1%}") # 通常 > 95%
# 构建双向映射
d2t = {i: tok for i, tok in enumerate(top_tokens)} # draft → target
t2d = {tok: i for i, tok in enumerate(top_tokens)} # target → draft
return d2t, t2d树状候选生成
与线性猜测不同,EAGLE-3 用树结构并行生成多条候选路径:
[token_0]
/ | \
[top1] [top2] [top3] ← depth 1: 3 个分支
/ | | \
[t1] [t2] [t1] [t1] ← depth 2: 扩展
| | |
[t1] [t1] [t1] ← depth 3: 继续def tree_speculative_decode(target_model, draft_model, input_ids,
tree_depth=5, top_k=10, total_tokens=60):
"""树状投机采样推理循环"""
# Step 1: Prefill - Target Model 计算初始隐藏状态
with torch.no_grad():
target_out = target_model(input_ids, output_hidden_states=True)
target_logits = target_out.logits[:, -1:]
hidden_states = target_out.hidden_states # 多层隐藏状态
# Step 2: Draft Model 生成树状候选
candidates = [] # 所有候选路径
tree_tokens = []
tree_positions = []
# 根节点:取 Target 的 top-k 作为第一层
probs = F.softmax(target_logits[:, -1], dim=-1)
top_k_tokens = torch.topk(probs, k=top_k).indices[0]
# 广度优先展开树
queue = [(tok.item(), [tok.item()], 0) for tok in top_k_tokens]
while queue and len(tree_tokens) < total_tokens:
token, path, depth = queue.pop(0)
if depth >= tree_depth:
continue
tree_tokens.append(token)
tree_positions.append(len(input_ids[0]) + depth)
# Draft Model 预测下一层
draft_input = torch.tensor([[token]], device=input_ids.device)
draft_logits = draft_model(hidden_states[-1][:, -1:], draft_input)
draft_probs = F.softmax(draft_logits[:, -1], dim=-1)
# 选 top-k 子节点
next_tokens = torch.topk(draft_probs, k=min(3, top_k)).indices[0]
for next_tok in next_tokens:
new_path = path + [next_tok.item()]
queue.append((next_tok.item(), new_path, depth + 1))
candidates.append(new_path)
# Step 3: Target Model 一次验证所有候选
all_draft_tokens = torch.tensor([tree_tokens], device=input_ids.device)
tree_mask = build_tree_attention_mask(tree_positions) # 因果+树结构
with torch.no_grad():
verify_logits = target_model(
all_draft_tokens,
attention_mask=tree_mask,
position_ids=torch.tensor([tree_positions])
).logits
# Step 4: 逐路径验证,找到最长接受路径
best_path, accept_len = evaluate_tree_candidates(
candidates, verify_logits, target_logits
)
return best_path, accept_lenDraft Model 训练流水线
EAGLE-3 的训练分两步:
Step 1: 生成训练数据 — 用 Target Model 对语料做推理,保存每层隐藏状态:
# 1. 启动 Target Model 生成对话数据
python generate_data_for_target_model.py \
--model_path Qwen/Qwen3-32B \
--data_path ShareGPT_data.jsonl \
--output_path generated_data/
# 2. 提取隐藏状态(这是关键步骤)
python generate_hidden_for_draft_model.py \
--model_path Qwen/Qwen3-32B \
--data_path generated_data/ \
--output_path hidden_states/ \
--save_all_layers TrueStep 2: 训练 Draft Model — 用 KL 散度对齐 Draft 和 Target 的输出分布:
def eagle3_training_step(draft_model, batch, training_steps=7):
"""EAGLE-3 训练步骤(简化版)"""
input_ids = batch['input_ids']
target_logits = batch['target_logits'] # 预计算的 Target 输出
hidden_states = batch['hidden_states'] # 预计算的 Target 隐藏状态
total_loss = 0
losses = []
for step in range(training_steps):
# Draft Model 前向
draft_logits = draft_model(hidden_states, input_ids)
# Target 概率(在裁剪词表上)
target_probs_pruned = F.softmax(
target_logits[..., draft_model.t2d], dim=-1 # 词表裁剪
)
# KL 散度损失
draft_log_probs = F.log_softmax(draft_logits, dim=-1)
step_loss = -torch.sum(target_probs_pruned * draft_log_probs) / batch_size
losses.append(step_loss)
# 更新输入(自回归移位)
input_ids = shift_tokens(input_ids, target_logits)
# 指数衰减加权:越近的预测权重越大
weights = [0.8 ** i for i in range(len(losses))]
total_loss = sum(w * l for w, l in zip(weights, losses))
return total_lossEAGLE-3 性能数据(来自 AngelSlim 基准测试)
| 模型 | 方法 | GSM8K | Alpaca | HumanEval | MT-bench | 平均加速 |
|---|---|---|---|---|---|---|
| Qwen3-1.7B | 标准推理 | 376 tok/s | 379 tok/s | 378 tok/s | 391 tok/s | 1x |
| Qwen3-1.7B | EAGLE-3 | 617 tok/s | 653 tok/s | 680 tok/s | 621 tok/s | 1.68x |
| Qwen3-8B | 标准推理 | 150 tok/s | 150 tok/s | 154 tok/s | 154 tok/s | 1x |
| Qwen3-8B | EAGLE-3 | 257 tok/s | 267 tok/s | 245 tok/s | 258 tok/s | 1.70x |
| Qwen3-32B | 标准推理 | 43.5 tok/s | 43.4 tok/s | 43.2 tok/s | 43.3 tok/s | 1x |
| Qwen3-32B | EAGLE-3 | 80.4 tok/s | 72.5 tok/s | 71.6 tok/s | 74.1 tok/s | 1.71x |
测试条件:vLLM v0.11.2, num_speculative_tokens=2, batch_size=1, output_len=1024
关键观察:平均接受长度约 2.5 token/step,模型越大加速比越稳定。
SpecExit:推理模型的思考早退
对于 DeepSeek-R1、QwQ 等推理模型,生成的 token 中大量是"思考过程"(Reasoning Token),实际有用的答案只占一小部分。SpecExit(腾讯,OSDI 2025 投稿)的核心思想是:
用 Draft Model 的隐藏状态预测何时可以停止思考,无需等待完整的推理链。
工作原理
Target Model 生成推理链:
"<think>首先分析题目... 然后计算... 验证结果...</think>答案是42"
↓
SpecExit 在中间某处判断:
"<think>首先分析题目... 然后计算...</think>答案是42"
↑
此处已经可以得到正确答案,后面的思考是冗余的信号预测与停止判据
SpecExit 从 Draft Model 的隐藏状态中提取三类信号:
class SpecExitPredictor:
"""SpecExit 早退信号预测器"""
def __init__(self, method="ewma", alpha=0.3, window_size=10):
self.method = method
self.alpha = alpha
self.window_size = window_size
self.scores = []
def update(self, draft_hidden_state):
"""从 Draft Model 隐藏状态提取停止信号"""
# 信号类型1: 置信度 (confidence) — 答案 token 的概率
# 信号类型2: 进度 (progress) — 已完成推理的比例
# 信号类型3: 剩余量 (remain) — 预计还需要多少 token
score = self.extract_signal(draft_hidden_state)
self.scores.append(score)
return self.predict_next()
def predict_next(self):
"""预测下一步的信号值"""
if self.method == "ewma":
# 指数加权移动平均
ewma = self.scores[0]
for s in self.scores[1:]:
ewma = self.alpha * s + (1 - self.alpha) * ewma
return ewma
elif self.method == "momentum":
# 动量法:用历史趋势外推
if len(self.scores) < 2:
return self.scores[-1]
deltas = [self.scores[i] - self.scores[i-1]
for i in range(1, len(self.scores))]
avg_delta = sum(deltas[-self.window_size:]) / len(deltas[-self.window_size:])
return self.scores[-1] + avg_delta
elif self.method == "mean":
# 滑动窗口均值
window = self.scores[-self.window_size:]
return sum(window) / len(window)
def should_exit(self, predicted_score):
"""判断是否可以提前退出"""
if "confidence" in self.method:
return predicted_score > 0.8 # 答案置信度足够高
elif "progress" in self.method:
return predicted_score > 0.3 # 推理进度超过 30%
elif "remain" in self.method:
return predicted_score < 200 # 预计剩余 token < 200
return FalseSpecExit 效果(在推理模型上):
- 生成长度减少 66%(省去冗余思考)
- 端到端延迟降低 2.5x
- 答案准确率基本不变
投机采样方法对比
| 方法 | Draft Model | 接受率 | 加速比 | 特点 |
|---|---|---|---|---|
| 基础投机采样 | 独立小模型 | 70-85% | 2-3x | 简单,Draft 与 Target 解耦 |
| EAGLE-3 | 特征级轻量模型 | 80-90% | 1.7-1.9x | 利用 Target 隐藏状态,树状搜索 |
| Medusa | 多个 LM Head | 60-80% | 1.5-2x | 无需独立 Draft Model |
| SpecExit | EAGLE-3 + 早退 | - | 2.5x+ | 针对推理模型,减少冗余思考 |
| Self-Speculative | Target 自身浅层 | 50-70% | 1.3-1.5x | 无需额外模型 |
Prefill-Decode 分离 (PD Disaggregation)
为什么要分离?
Prefill 是 Compute-bound,Decode 是 Memory-bound,二者对硬件的需求截然不同:
| 维度 | Prefill | Decode |
|---|---|---|
| 计算特征 | 大矩阵乘法,高算力需求 | 向量-矩阵乘法,高带宽需求 |
| 理想硬件 | 高 FLOPS 的 GPU (如 H100) | 高带宽的 GPU 或 Memory-centric 架构 |
| Batch Size | 通常较小(几个请求) | 可以很大(数百个请求) |
| 耗时 | 可预测(与 prompt 长度成正比) | 不确定(取决于生成长度) |
将 Prefill 和 Decode 放在不同的物理节点上运行,可以分别优化:
- Prefill 节点:配置高算力 GPU,专注处理 prompt
- Decode 节点:配置高带宽系统,专注逐 token 生成
- 各自优化 Batch Size:互不干扰
架构设计
PD 分离的基本架构:
┌──────────────────────┐
│ Scheduler │
│ (Ray Remote Actor) │
└──────┬───────┬───────┘
│ │
┌────────────▼──┐ ┌──▼────────────┐
│ Prefill Node │ │ Decode Node │
│ (GPU Group) │ │ (GPU Group) │
└──────┬────────┘ └────┬──────────┘
│ │
└───────┬───────┘
│
┌────────▼────────┐
│ KV Cache Store │
│ (Distributed) │
└─────────────────┘分布式调度器(基于 Ray):
@ray.remote
class Scheduler:
def __init__(self, max_seq_len=1024):
self.requests = {}
self.waiting_queue = deque() # 等待 Prefill 的请求
self.running_requests = set() # 正在 Decode 的请求
self.is_running_prefill = True
self.is_running_decoding = True
async def add_request(self, prompt, max_seq_len):
"""异步接收新请求"""
request_id = len(self.requests)
request = Request(request_id, prompt, max_seq_len)
self.requests[request_id] = request
self.waiting_queue.append(request_id)
return request_id
def get_waiting_requests(self):
"""Prefill 节点拉取等待的请求"""
if len(self.waiting_queue) == 0:
return None
plan = BatchPlan()
for _ in range(len(self.waiting_queue)):
request_id = self.waiting_queue.popleft()
plan.prompts.append(self.requests[request_id].prompt)
plan.ids.append(request_id)
return plan
def get_running_requests(self):
"""Decode 节点拉取正在生成的请求"""
if len(self.running_requests) == 0:
return None
plan = BatchPlan()
for request_id in self.running_requests:
plan.prompts.append([self.requests[request_id].generated_tokens[-1]])
plan.ids.append(request_id)
return planKV Cache 传输策略
PD 分离的核心挑战是 KV Cache 的传输:Prefill 节点计算完 KV Cache 后需要传输到 Decode 节点。
三种 KV Cache 管理模式:
- KV Cache 仅存于 Decode 节点:Prefill 计算后直接传输过去(实现简单)
- PD 各存各的:Prefill 节点也保留 Cache,用于 Chunked-Prefill
- 中心化 Cache 服务:独立的 KV Cache 服务,PD 节点都从中存取
@ray.remote
class DistributedKVCacheEngine:
def __init__(self, config):
self.kv_cache = torch.zeros(
2, config.num_layers, config.kv_cache_batch,
config.kv_cache_len, config.num_heads, config.head_dim
)
self.request_to_batch = {} # request_id -> kv_batch_id
async def update_from_prefill(self, reqs, kv):
"""Prefill 节点写入 KV Cache"""
start_ids = len(self.request_to_batch)
for i, req_id in enumerate(reqs):
self.request_to_batch[req_id] = start_ids + i
self.kv_cache[:, :, start_ids: start_ids + len(reqs)] = kv
def update_from_decoding(self, reqs, kv, decoding_idx):
"""Decode 节点更新单 token 的 KV"""
for i, req_id in enumerate(reqs):
pos = decoding_idx[i]
batch_id = self.request_to_batch[req_id]
self.kv_cache[:, :, batch_id, pos] = kv[:, :, i, 0]
def get_kv_cache(self, reqs):
"""Decode 节点读取 KV Cache"""
kv_batch_ids = [self.request_to_batch[r] for r in reqs]
return self.kv_cache[:, :, kv_batch_ids]PD 分离引擎的主循环:
@ray.remote
class DissagreationPDEngine:
def __init__(self, config, prefill_actors, decoding_actors, scheduler, kvcache):
self.prefill_actors = prefill_actors
self.decoding_actors = decoding_actors
self.scheduler = scheduler
self.kvcache = kvcache
def _step_prefill(self):
"""Prefill 节点的主循环"""
while not self._is_finish_prefill():
info = ray.get(self.scheduler.get_waiting_requests.remote())
if info is None:
time.sleep(0.1)
continue
# 执行 Prefill
input_ids = self._get_prefill_batch(info)
results = ray.get(self.prefill_actors.forward(x=input_ids))
logits, kv = results[0]
# 写入 KV Cache
ray.get(self.kvcache.update_from_prefill.remote(info.ids, kv))
# 更新请求状态 → 进入 Decode 阶段
next_token = torch.argmax(logits[:, -1, :], dim=-1)
for i, req_id in enumerate(info.ids):
ray.get(self.scheduler.update_request.remote(req_id, next_token[i].item()))
def _step_decoding(self):
"""Decode 节点的主循环"""
while not self._is_finish_decoding():
info = ray.get(self.scheduler.get_running_requests.remote())
if info is None:
time.sleep(0.1)
continue
# 读取 KV Cache
kv_cache = ray.get(self.kvcache.get_kv_cache.remote(info.ids))
# 执行 Decode
input_ids = self._get_decoding_batch(info)
results = ray.get(self.decoding_actors.forward(x=input_ids, kvcaches=kv_cache))
logits, kv = results[0]
# 更新 KV Cache 和请求状态
ray.get(self.kvcache.update_from_decoding.remote(info.ids, kv, info.last_pos))
next_token = torch.argmax(logits[:, -1, :], dim=-1)
for i, req_id in enumerate(info.ids):
ray.get(self.scheduler.update_request.remote(req_id, next_token[i].item()))PD 分离 vs PD 融合:
- PD 分离适合:大规模部署、异构硬件、高吞吐需求
- PD 融合(Chunked Prefill)适合:单节点、低延迟需求
- 实际系统通常是分离和融合兼并的,例如 Prefill 节点内部也可以用 Chunked Prefill
vLLM 架构深度解析
vLLM 是目前最流行的开源 LLM 推理引擎,集成了上述几乎所有优化技术。
vLLM V0 架构
V0 架构采用 Scheduler + Worker 的同步执行模式:
┌─────────────────────────────────────────────┐
│ vLLM V0 Engine │
├─────────────────────────────────────────────┤
│ Scheduler │
│ ├── Waiting Queue(等待 Prefill 的请求) │
│ ├── Running Queue(正在 Decode 的请求) │
│ └── Swapped Queue(被换出到 CPU 的请求) │
├─────────────────────────────────────────────┤
│ Block Manager (PagedAttention 显存管理) │
│ ├── PageAllocator(逻辑块→物理块映射) │
│ ├── GPU Block Allocator │
│ └── CPU Block Allocator(Swap 用) │
├─────────────────────────────────────────────┤
│ Worker(模型执行) │
│ ├── Model Runner(前向传播) │
│ └── Cache Engine(KV Cache 物理存储) │
└─────────────────────────────────────────────┘V0 的 KVCachePool 实现:
class KVCachePool:
"""基于页帧的 KV 缓存存储池"""
def __init__(self, config):
self.allocator = PageAllocator(config.page_size, config.num_pages)
self.page_size = config.page_size
# KV 的物理存储:按页组织
self.kv_store = torch.zeros(
2, # K 和 V
config.num_layers, # Transformer 层数
config.num_pages, # 总页数
config.page_size, # 每页 token 数
config.num_heads,
config.head_dim,
)
self.req_page_map = {} # request_id -> [page_id, ...]
self.cached_tokens = {} # request_id -> 已缓存 token 数
def write_tokens(self, request_id, KV):
"""将新计算的 KV 追加写入页帧,按需扩展"""
_, L, T, H, D = KV.shape
if request_id not in self.cached_tokens:
# 首次写入:分配起始页
self.cached_tokens[request_id] = 0
initial = self.allocator.acquire(count=1)
self.req_page_map[request_id] = initial
remaining = T
while remaining > 0:
pages = self.req_page_map[request_id]
capacity = self.page_size * len(pages) - self.cached_tokens[request_id]
if capacity == 0:
# 当前页已满,扩展一页
extra = self.allocator.acquire(count=1, prev_page=pages[-1])[0]
self.req_page_map[request_id].append(extra)
capacity = self.page_size
# 写入本轮数据
n_write = min(capacity, remaining)
dst_page = pages[-1]
offset = self.page_size - capacity
src_start = T - remaining
self.kv_store[:, :, dst_page, offset:offset + n_write] = KV[:, :, src_start:src_start + n_write]
remaining -= n_write
self.cached_tokens[request_id] += n_write
def free(self, request_id):
"""释放请求占用的全部页帧"""
pages = self.req_page_map[request_id]
self.allocator.release(pages)
self.kv_store[:, :, pages, :, :, :] = 0
del self.req_page_map[request_id]vLLM V1 架构演进
V1 的核心改进是消除 Prefill/Decode 的概念区分,统一为 Chunked Prefill:
┌──────────────────────────────────────────────────────┐
│ vLLM V1 EngineCore │
├──────────────────────────────────────────────────────┤
│ Scheduler(Chunked Prefill 统一调度) │
│ ├── Decode 请求优先并入 batch │
│ ├── Prefill 请求按 token budget 切分 │
│ └── 合并为一条序列送入模型 │
├──────────────────────────────────────────────────────┤
│ KVCachePool(分页式 KV Cache) │
│ ├── 按需分配/释放物理页 │
│ └── 请求级别的页映射管理 │
├──────────────────────────────────────────────────────┤
│ ModelWrapper(模型执行封装) │
│ ├── PageAttention Prefill Kernel │
│ └── PageAttention Decoding Kernel │
└──────────────────────────────────────────────────────┘V1 Engine 的 step 函数(核心主循环):
class vLLMEngine:
def __init__(self, model, config):
self.cacher = KVCachePool(config)
self.model_wrapper = ModelWrapper(model, self.cacher)
self.scheduler = Scheduler(config.max_seq_len)
def step(self, config):
"""
一步推理的完整流程:
1. 获取融合 batch(Decode 优先 + Chunked Prefill)
2. 执行前向计算
3. 生成 next token
4. 更新状态 / KV Cache
"""
if self.scheduler.get_available_request() == 0:
return
# 1. 调度:获取混合 PD batch
info = self.scheduler.plan_next_batch(
max_batch_tokens=config.max_batch_tokens,
max_prefill_batch=config.max_prefill_batch,
max_decoding_batch=config.max_decoding_batch,
)
# 2. 获取 KV Cache
kv_cache, info.kv_page_len = self.cacher.get_kv_cache(info.ids)
# 3. 将多个请求的 token 拼接成一条序列
input_ids = torch.tensor([info.flat_input], dtype=torch.long)
# 4. 执行计算
next_token, kv = self.execute(input_ids, kv_cache, info)
# 5. 更新请求状态和 KV Cache
self.update(next_token, kv, info)
def execute(self, input_ids, kv_cache, info):
logits, KV = self.model_wrapper.forward(input_ids, kv_cache, info)
next_token = torch.argmax(logits, dim=-1)
# 按请求拆分 next token
tokens = []
reqs_next_token = torch.split(next_token, info.chunk_len, dim=0)
for bid, token_ids in enumerate(reqs_next_token):
if info.decode_flags[bid]:
tokens.append(token_ids[-1])
elif info.last_pos[bid] != -1:
tokens.append(token_ids[-1]) # prefill 完成
else:
tokens.append(-1) # chunked prefill 未完成
return tokens, KV
def update(self, next_token, KV, info):
# 更新请求
for bid, token in enumerate(next_token):
req_id = info.ids[bid]
if token != -1:
self.scheduler.update_request(req_id, token.item())
# 更新 KV Cache
reqs_KV = KV.split(info.chunk_len, dim=2)
for req_id, tmp_KV in zip(info.ids, reqs_KV):
if self.scheduler.requests[req_id].status == "REQUEST_COMPLETED":
self.cacher.free(req_id)
else:
self.cacher.write_tokens(req_id, tmp_KV)
# 更新 kv_len
for i, req_id in enumerate(info.ids):
self.scheduler.requests[req_id].kv_len += info.chunk_len[i]V1 的模型层如何处理混合 PD batch:
class PageAttentionBlock(nn.Module):
def forward(self, X, KV_Cache, info):
N_p = info.prefill_batch # Prefill 请求数
N_d = info.decoding_batch # Decode 请求数
Q, K, V = self.WQ(X), self.WK(X), self.WV(X)
# 按请求拆分(每个请求的 chunk_len 不同)
Q = Q.split(info.chunk_len, dim=0)
K = K.split(info.chunk_len, dim=0)
V = V.split(info.chunk_len, dim=0)
# Decode 请求:使用 PageAttention Decoding Kernel
if N_d > 0:
O_d = self.forward_decoding(Q[:N_d], K[:N_d], V[:N_d],
KV_Cache=KV_Cache[:N_d])
# Prefill 请求:使用 PageAttention Prefill Kernel
if N_p > 0:
O_p = self.forward_prefill(Q[N_d:], K[N_d:], V[N_d:],
KV_Cache=KV_Cache[N_d:])
# 拼接输出
if N_p > 0 and N_d > 0:
O = torch.cat((O_d, O_p), dim=0)
elif N_d > 0:
O = O_d
else:
O = O_p
return OV0 vs V1 对比:
| 特性 | V0 | V1 |
|---|---|---|
| Prefill/Decode | 分开处理 | 统一为 Chunked Prefill |
| 调度粒度 | 请求级别 | Token 级别(token budget) |
| 新请求 | 等当前 batch 有空位 | 切 chunk 立即开始 |
| GPU 利用率 | Decode 阶段低 | Prefill 填充空闲算力 |
| 实现复杂度 | 较低 | 较高 |
端侧推理引擎
前面介绍的优化技术(KV Cache、PagedAttention、投机采样等)主要面向云端 GPU 集群。但随着端侧芯片算力提升和隐私需求增长,在手机、嵌入式设备上直接运行 LLM 正在成为新的工程前沿。
本节从通用视角剖析端侧 LLM 部署的完整流程,涵盖模型适配、量化、编译和上板的每一步核心概念。
端侧推理的挑战与动机
为什么要在设备上跑大模型?
云端推理虽然算力充足,但存在不可忽视的局限性:
动机:
- 隐私保护:数据不出设备,用户的聊天记录、照片等敏感信息无需上传云端
- 延迟要求:无网络往返延迟,在弱网/无网环境下也能流畅使用
- 成本节约:无需云端 GPU 算力费用,边际成本趋近于零
- 离线可用:飞行模式、地铁等无网场景依然可用
约束:
- 内存有限:手机通常只有 8-16GB 统一内存,需要与系统和其他 App 共享
- 功耗限制:移动设备需要严格控制能耗,不能像 GPU 集群那样「暴力计算」
- 无 CUDA:端侧芯片(NPU、ANE)有自己的指令集和编程模型,无法直接复用云端的 CUDA 生态
端侧模型规模限制
受内存和算力约束,端侧通常只能运行 0.5B-7B 参数的模型,且几乎必须使用 INT4/INT8 量化。这与云端动辄 70B-400B 的模型规模形成鲜明对比。
端侧推理引擎生态
| 引擎 | 厂商 | 目标硬件 | 特点 |
|---|---|---|---|
| QNN SDK | Qualcomm | Snapdragon NPU | 手机端主流,支持 INT4/INT8 |
| Core ML | Apple | ANE (Apple Neural Engine) | iOS/macOS 设备 |
| TensorRT | NVIDIA | Jetson GPU | 边缘 GPU 加速 |
| ONNX Runtime Mobile | Microsoft | CPU/NPU | 跨平台 |
| MLC LLM | 开源 | CPU/GPU/NPU | 通用 LLM 推理 |
| llama.cpp | 开源 | CPU | 纯 CPU,极致兼容 |
| MediaPipe | Android/iOS | 移动端 ML |
如何选择?
- Android + Qualcomm 芯片:优先 QNN SDK,充分利用 NPU 算力
- iOS/macOS:优先 Core ML,与系统深度集成
- 跨平台/快速验证:llama.cpp 或 MLC LLM
- 边缘 GPU(Jetson):TensorRT
端侧部署全景流程
端侧部署一个 LLM(包括多模态模型)的通用流水线如下:
HuggingFace 模型 (FP32/FP16)
↓ Step 1: 模型适配(Model Adaptation)
适配后模型(Linear→Conv2d, Conv3d→Conv2d, Eager Attention)
↓ Step 2: 图优化(Graph Optimization)
优化后模型(算子融合、常量折叠、静态 shape 绑定)
↓ Step 3: 量化(PTQ + SeqMSE / LPBQ)
量化模型(W8A16 / W4A16)
↓ Step 4: 中间格式导出 + 精度验证
ONNX / TorchScript + 量化编码 + Test Vectors
↓ Step 5: NPU 编译流水线
设备可执行二进制(Context Binary / Core ML Model / TRT Engine)
↓ Step 6: 部署到设备
端侧芯片 NPU / ANE 执行下面逐步深入每一个环节的核心概念与通用实现。
Step 1: 模型适配(Model Adaptation)
NPU 的算子支持与 GPU 完全不同。HuggingFace 模型直接导出到 NPU 会遇到大量不支持的算子(如 nn.Linear、Conv3d、Flash Attention 等)。因此第一步是改写模型结构,使其对 NPU 友好。
1.1 Linear → Conv2d 替换
多数 NPU 对 Conv2d 有深度优化的硬件指令,但对 nn.Linear 的支持较弱。解决方案:将所有 Linear 层替换为 1×1 Conv2d。
import torch
import torch.nn as nn
class LinearToConv1x1(nn.Conv2d):
"""用 1x1 Conv2d 替换 Linear 层,保持功能等价。
数学原理:nn.Linear(in, out) 计算 y = xW^T + b,
等价于 Conv2d(in, out, kernel_size=1) 对 (B, in, 1, 1) 输入的计算。
"""
def __init__(self, linear: nn.Linear):
out_features, in_features = linear.weight.shape
super().__init__(
in_features, out_features, kernel_size=1,
bias=(linear.bias is not None),
dtype=linear.weight.dtype,
)
# Linear 权重 [out, in] → Conv2d 权重 [out, in, 1, 1]
self.weight.data.copy_(linear.weight.data[:, :, None, None])
if linear.bias is not None:
self.bias.data.copy_(linear.bias.data)
def forward(self, x: torch.Tensor) -> torch.Tensor:
orig_shape = x.shape
if x.ndim == 2:
# (seq, hidden) → (1, hidden, 1, seq)
x = x.unsqueeze(0).unsqueeze(2).permute(0, 3, 2, 1)
elif x.ndim == 3:
# (B, seq, hidden) → (B, hidden, 1, seq)
x = x.permute(0, 2, 1).unsqueeze(2)
x = super().forward(x)
# 还原为原始 layout
if len(orig_shape) == 2:
x = x.permute(0, 3, 2, 1).squeeze(2).squeeze(0)
elif len(orig_shape) == 3:
x = x.squeeze(2).permute(0, 2, 1)
return x为什么这样做?
nn.Linear(in, out)计算,等价于 Conv2d(in, out, kernel_size=1)在空间维为 1 时的计算- NPU 对 Conv2d 有专门的硬件加速单元,能以更高吞吐执行
- 继承
nn.Conv2d而非包装,确保下游工具(如量化框架、LoRA Adapter)能正常识别
全模型递归替换的通用工具函数:
def replace_all_linears(module: nn.Module) -> nn.Module:
"""递归地将模型中所有 nn.Linear 替换为 LinearToConv1x1"""
for name, child in module.named_children():
if isinstance(child, nn.Linear):
setattr(module, name, LinearToConv1x1(child))
else:
replace_all_linears(child)
return module
model = replace_all_linears(model)1.2 Conv3d → 多个 Conv2d 分解
多模态模型的 Vision Encoder 有时使用 Conv3d 做 Patch Embedding(处理视频的时空维度),但 NPU 通常不支持 3D 卷积。解决方案:沿某个维度(通常是时间维度)拆分,用多个 Conv2d 相加替代。
class Conv3dDecomposer(nn.Module):
"""将一个 Conv3d 沿指定维度分解为多个 Conv2d 之和。
原理:Conv3d(X) = sum_t W[:,:,t,:,:] * X[:,:,t,:,:] 沿时间维 t 可拆分。
每个 Conv2d 处理一个时间切片,最终结果相加,数学完全等价。
"""
def __init__(self, conv3d: nn.Conv3d, split_dim: int = 2):
super().__init__()
# split_dim: 在 (B, C, D, H, W) 中要拆分的维度,默认 D(时间)
self.split_dim = split_dim
kernel_3d = conv3d.weight # [C_out, C_in, kD, kH, kW]
kD = conv3d.kernel_size[0]
# 提取 2D kernel size(去掉 split 维度)
kernel_2d = (conv3d.kernel_size[1], conv3d.kernel_size[2])
stride_2d = (conv3d.stride[1], conv3d.stride[2])
self.convs = nn.ModuleList()
for t in range(kD):
conv2d = nn.Conv2d(
conv3d.in_channels, conv3d.out_channels,
kernel_size=kernel_2d, stride=stride_2d, bias=False,
)
# 从 3D 权重中切出第 t 个 2D 切片
conv2d.weight.data.copy_(kernel_3d[:, :, t, :, :])
self.convs.append(conv2d)
self.bias = conv3d.bias # bias 只加一次
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (B, C, D, H, W) → 沿 D 拆分为 D 个 (B, C, H, W)
slices = x.split(1, dim=self.split_dim)
result = None
for t, conv in enumerate(self.convs):
out = conv(slices[t].squeeze(self.split_dim))
result = out if result is None else result + out
if self.bias is not None:
result = result + self.bias.view(1, -1, 1, 1)
return result数学等价性证明
设 Conv3d 权重为
原始 Conv3d 输出:
分解后:每个 Conv2d 计算
两者完全等价,无精度损失。
1.3 Attention 机制适配
HuggingFace 默认使用 Flash Attention(需要 CUDA),NPU 不支持。需要改写为 Eager Attention(朴素矩阵乘法实现):
import torch.nn.functional as F
class NPUAttention(nn.Module):
"""NPU 友好的 Eager Attention 实现。
与 Flash Attention 数学等价,但使用标准 matmul + softmax,
可被所有 NPU 后端执行。
"""
def __init__(self, hidden_size: int, num_heads: int, head_dim: int):
super().__init__()
self.num_heads = num_heads
self.head_dim = head_dim
self.scaling = head_dim ** -0.5
self.qkv_proj = nn.Linear(hidden_size, 3 * num_heads * head_dim)
self.out_proj = nn.Linear(num_heads * head_dim, hidden_size)
def forward(
self, x: torch.Tensor,
rope_cos: torch.Tensor = None,
rope_sin: torch.Tensor = None,
) -> torch.Tensor:
B, S, _ = x.shape
# QKV 投影
qkv = self.qkv_proj(x).reshape(B, S, 3, self.num_heads, self.head_dim)
q, k, v = qkv.permute(2, 0, 3, 1, 4).unbind(0)
# q/k/v: (B, num_heads, S, head_dim)
# 应用 RoPE(预计算的 cos/sin 作为输入,避免 NPU 上做三角运算)
if rope_cos is not None and rope_sin is not None:
q = q * rope_cos + _rotate_half(q) * rope_sin
k = k * rope_cos + _rotate_half(k) * rope_sin
# 标准矩阵乘法 Attention(替代 Flash Attention)
attn_weights = torch.matmul(q, k.transpose(-2, -1)) * self.scaling
attn_weights = F.softmax(attn_weights, dim=-1, dtype=torch.float32).to(q.dtype)
attn_output = torch.matmul(attn_weights, v)
# (B, num_heads, S, head_dim) → (B, S, hidden)
attn_output = attn_output.transpose(1, 2).reshape(B, S, -1)
return self.out_proj(attn_output)
def _rotate_half(x: torch.Tensor) -> torch.Tensor:
"""RoPE 的旋转辅助函数"""
x1, x2 = x.chunk(2, dim=-1)
return torch.cat((-x2, x1), dim=-1)关键改动:
- Flash Attention → 标准
matmul + softmax + matmul(NPU 可执行) - RoPE 从动态计算改为预计算 cos/sin 作为输入,避免在 NPU 上做复杂的三角函数运算
- 所有 tensor 操作保持 NPU 友好(无 in-place、无动态 shape 分支)
1.4 端侧模型的通用适配要点
除了上述核心算子替换,端侧部署还需要关注以下适配:
# 端侧 LLM Decoder 的通用适配配置
adaptation_config = {
# Attention 相关
'attn_implementation': 'eager', # 强制 eager attention
'return_new_kv_only': True, # KV Cache 只输出增量,由外部管理拼接
'transposed_key_cache': True, # Key 转置存储 (head, dim, seq),加速 matmul
'precomputed_rope': True, # RoPE cos/sin 作为外部输入
# 输入相关
'fixed_input_shape': True, # 固定输入 shape(NPU 编译期需要确定)
'precomputed_attention_mask': True, # 注意力掩码预计算后传入
# 模型拆分
'separate_embedding': True, # Embedding 层分离(可在 CPU 执行)
'separate_lm_head': True, # LM Head 分离(可单独量化)
'separate_vision_encoder': True, # Vision Encoder 分离(独立量化方案)
}端侧固定尺寸策略
云端推理可以处理动态分辨率的图像和变长序列,但端侧模型通常要求固定输入尺寸,原因:
- NPU 编译时需要确定 tensor shape 以进行硬件指令调度优化
- 动态 shape 会导致频繁重编译,引入不可接受的延迟
- KV Cache 大小需要预分配,固定尺寸简化内存管理
Step 2: 图优化(Graph Optimization)
适配后的模型需要进行图级优化,为量化做准备。这一步在不同 SDK 中有不同工具,但核心工作是相同的:
import torch.onnx
# Step 2.1: 导出为 ONNX 中间表示
dummy_input = torch.randn(1, 128, hidden_size)
torch.onnx.export(
adapted_model,
dummy_input,
"model.onnx",
opset_version=17,
input_names=['input_ids_embed'],
output_names=['logits'],
dynamic_axes=None, # 端侧不使用动态 axes
)
# Step 2.2: 使用 ONNX 的图优化 pass
import onnx
from onnxruntime.transformers import optimizer
optimized_model = optimizer.optimize_model(
"model.onnx",
model_type='bert', # 选择合适的优化模板
opt_level=2, # 算子融合 + 常量折叠
)
optimized_model.save_model_to_file("model_optimized.onnx")图优化的核心工作:
- 算子融合:将 MatMul + Add → FusedLinear,LayerNorm 的多步运算 → 单算子
- 常量折叠:预计算所有编译期可确定的值
- 冗余消除:删除无用的 Reshape、Transpose 对
- Softmax 保留:确保 Softmax 不被折叠到其他算子中(量化需要保留独立节点)
图优化注意事项
某些优化 pass 可能破坏后续量化的精度。例如公共子表达式消除(CSE)可能合并本应独立量化的分支。实践中需要根据量化结果有选择地跳过部分优化。
Step 3: 量化(PTQ)
这是端侧部署中对精度影响最大的一步。端侧量化通常使用训练后量化(PTQ),核心流程如下:
3.1 量化仿真
量化仿真(Quantization Simulation)在 PyTorch 中模拟量化效果,无需真实硬件:
import torch
from torch.ao.quantization import (
get_default_qconfig_mapping,
prepare_fx,
convert_fx,
)
# 创建量化配置:权重 INT8 对称,激活 INT16 非对称
from torch.ao.quantization import QConfig, MinMaxObserver, PerChannelMinMaxObserver
qconfig_w8a16 = QConfig(
activation=MinMaxObserver.with_args(
dtype=torch.qint16, qscheme=torch.per_tensor_affine
),
weight=PerChannelMinMaxObserver.with_args(
dtype=torch.qint8, qscheme=torch.per_channel_symmetric
),
)
# 插入观测器(Observer)
prepared_model = prepare_fx(
optimized_model,
qconfig_mapping={"": qconfig_w8a16},
example_inputs=(dummy_input,),
)Vision Encoder vs LLM Decoder 的量化差异
| 配置项 | Vision Encoder | LLM Decoder |
|---|---|---|
| 权重位宽 | 8-bit | 8-bit 或 4-bit |
| 激活位宽 | 16-bit | 16-bit |
| SeqMSE | 可选 | 推荐(对精度影响显著) |
| LPBQ | 不适用 | 可选(对输出投影层有效) |
| Embedding Table | - | INT8 单独处理 |
Vision Encoder 对量化更敏感,所以使用 W8A16(权重 8-bit,激活 16-bit)方案。LLM Decoder 参数量大,可以更激进地使用 W4A16。
3.2 校准与计算量化参数
def calibrate(model: nn.Module, calibration_loader, num_batches: int = 100):
"""用真实数据跑 forward pass,统计每层的激活值分布"""
model.eval()
with torch.no_grad():
for idx, batch in enumerate(calibration_loader):
if idx >= num_batches:
break
model(batch['input_ids_embed'].cuda())
# 执行校准:Observer 自动统计 min/max
calibrate(prepared_model, train_loader, num_batches=100)
# 将观测到的统计量转换为量化参数 (scale, zero_point)
quantized_model = convert_fx(prepared_model)3.3 SQNR 逐阶段精度验证
每一步适配/优化/量化后,都通过 SQNR(Signal-to-Quantization-Noise Ratio)验证精度:
def compute_sqnr(signal: torch.Tensor, quantized: torch.Tensor) -> float:
"""计算信号与量化噪声之比(dB)。值越高表示量化损失越小。"""
noise = signal - quantized
signal_power = (signal ** 2).mean()
noise_power = (noise ** 2).mean()
if noise_power == 0:
return float('inf')
return 10 * torch.log10(signal_power / noise_power).item()
# 阶段 1: 适配后 vs 原始 FP32
adapted_sqnr = compute_sqnr(fp_output, adapted_output)
print(f'Adapted vs FP32 SQNR: {adapted_sqnr:.1f} dB') # 期望 > 60 dB
# 阶段 2: 图优化后 vs FP32
optimized_sqnr = compute_sqnr(fp_output, optimized_output)
print(f'Optimized vs FP32 SQNR: {optimized_sqnr:.1f} dB') # 期望 > 55 dB
# 阶段 3: 量化后 vs 图优化后
quant_sqnr = compute_sqnr(optimized_output, quantized_output)
print(f'Quantized vs Optimized SQNR: {quant_sqnr:.1f} dB') # 期望 > 30 dBSQNR 红线
- SQNR > 35 dB:量化损失可以接受
- SQNR 25-35 dB:需要仔细评估任务精度
- SQNR < 25 dB:量化方案需要调整(降低量化比特、使用 SeqMSE、保留敏感层为高精度)
3.4 高级量化技术:SeqMSE 与 LPBQ
标准 PTQ 使用 MinMax 校准,容易受离群值影响。以下两种公开研究的技术可以显著提升量化精度:
SeqMSE vs 普通 PTQ 的工作原理
普通 PTQ(MinMax):
- 跑校准数据,记录每个 tensor 的 min/max
scale = (max - min) / (2^bits - 1),offset = min / scale- 问题:离群值会拉大 range,导致大部分值的量化精度下降
SeqMSE(Sequential MSE Minimization):
- 逐层处理,冻结其他层
- 对当前层尝试多个候选 (scale, offset) 组合
- 选择使
最小的那组参数 - 计算量更大(每层需要多次 forward),但精度显著提升
- 特别适合 INT4 这种极低比特量化
LPBQ(Low-Precision Block Quantization):
- 将权重矩阵划分为小块(如 128 元素一组)
- 每个块独立计算量化参数
- 块粒度比 per-channel 更细,能更好地适应权重分布不均匀的情况
- 对 LM Head 等大型投影层效果尤其好
# SeqMSE 的伪代码实现思路
def sequential_mse_calibration(model, layer, calibration_data, num_candidates=20):
"""逐层搜索最优量化参数"""
best_scale, best_zp, best_mse = None, None, float('inf')
# 获取当前层的 FP32 参考输出
fp_output = run_layer_fp32(layer, calibration_data)
# 从观测到的 min/max 中生成多个候选 scale
observed_min, observed_max = observe_range(layer, calibration_data)
candidates = generate_scale_candidates(observed_min, observed_max, num_candidates)
for scale, zero_point in candidates:
# 用候选参数做量化 forward
quant_output = run_layer_quantized(layer, calibration_data, scale, zero_point)
mse = ((fp_output - quant_output) ** 2).mean()
if mse < best_mse:
best_scale, best_zp, best_mse = scale, zero_point, mse
return best_scale, best_zpStep 4: 中间格式导出与精度验证
量化完成后,需要导出为目标 SDK 可消费的中间格式,并生成 Test Vectors 用于设备端精度比对:
# 1. 导出量化后的 ONNX 模型
torch.onnx.export(
quantized_model,
dummy_input,
"model_quantized.onnx",
opset_version=17,
input_names=['input_ids_embed'],
output_names=['logits'],
)
# 2. 导出量化编码(每个 tensor 的 scale 和 zero_point)
def export_encodings(model, path: str):
"""将量化参数保存为 JSON,供 NPU 编译器使用"""
encodings = {}
for name, module in model.named_modules():
if hasattr(module, 'scale') and hasattr(module, 'zero_point'):
encodings[name] = {
'scale': module.scale.tolist(),
'zero_point': module.zero_point.tolist(),
'bitwidth': module.bitwidth,
}
import json
with open(path, 'w') as f:
json.dump(encodings, f, indent=2)
export_encodings(quantized_model, "model_quantized.encodings")
# 3. 生成 Test Vectors(用于设备端精度对齐)
def generate_test_vectors(model, inputs, output_dir: str):
"""保存参考输入输出,部署后可比对精度"""
import os, numpy as np
os.makedirs(output_dir, exist_ok=True)
with torch.no_grad():
outputs = model(*inputs)
for i, inp in enumerate(inputs):
np.save(f"{output_dir}/input_{i}.npy", inp.cpu().numpy())
np.save(f"{output_dir}/output.npy", outputs.cpu().numpy())
generate_test_vectors(quantized_model, (dummy_input,), "test_vectors/")导出产物:
model_quantized.onnx:带量化信息的模型结构和权重model_quantized.encodings:每个 tensor 的量化参数(scale、zero_point、bitwidth)test_vectors/:输入输出的参考值,用于设备端比对
Step 5: NPU 编译流水线
这是将中间格式转换为设备可执行格式的关键步骤。虽然不同 SDK 的具体命令不同,但编译流水线的结构是通用的:
5.1 模型分割
端侧内存有限,需要将大模型拆分为多个子图:
import onnx
def split_onnx_model(model_path: str, split_points: list, output_dir: str):
"""将 ONNX 模型按指定节点拆分为多个子图。
典型的拆分策略:
- Part 1: Embedding 层(可在 CPU 执行)
- Part 2: Decoder Layers(主体,在 NPU 执行)
- Part 3: LM Head(单独量化方案)
"""
model = onnx.load(model_path)
# ... 根据 split_points 的节点名称切分图
# 每个子图独立保存,包含各自的输入输出定义
for i, subgraph in enumerate(subgraphs):
onnx.save(subgraph, f"{output_dir}/part_{i+1}_of_{len(subgraphs)}.onnx")5.2 注意力结构优化
NPU 通常对单头注意力(SHA)的执行效率高于多头注意力(MHA)。编译阶段可以将 MHA reshape 为多个独立的 SHA 调用:
MHA (Multi-Head Attention):
Q: (B, num_heads, S, head_dim) → 一次大矩阵乘法
SHA (Single-Head Attention):
for h in range(num_heads):
Q_h: (B, 1, S, head_dim) → 多次小矩阵乘法
NPU 优势:小矩阵乘法可以更好地利用片上缓存(SRAM/VTCM),
减少对外部 DRAM 的访问。5.3 通用编译流程
不同 SDK 的编译流程虽然命令不同,但结构相似:
# === 通用 NPU 编译流水线(伪命令) ===
# Step 1: 中间格式转换(ONNX → SDK 特有格式)
npu-converter \
--input model_quantized.onnx \
--quantization_overrides model_quantized.encodings \
--output model.intermediate
# Step 2: 量化编译(应用量化参数到硬件指令)
npu-quantize-compile \
--input model.intermediate \
--activation_bitwidth 16 \
--weight_bitwidth 8 \
--calibration_data input_list.txt \
--output model.compiled
# Step 3: 生成设备可执行二进制
npu-binary-generator \
--backend npu_accelerator_lib \
--compiled_model model.compiled \
--optimization_level 3 \
--output model.device_binary各 SDK 的实际编译命令对比
| 步骤 | QNN SDK | Core ML | TensorRT |
|---|---|---|---|
| 格式转换 | qairt-converter | coremltools.convert() | trtexec --onnx |
| 量化编译 | qairt-quantizer | 内置于 convert | trtexec --int8 |
| 二进制生成 | qnn-context-binary-generator | .mlmodelc bundle | .engine file |
| 最终产物 | .serialized.bin | .mlmodelc | .engine |
5.4 Multi-AR Weight Sharing
LLM 推理的 Prefill 和 Decode 阶段输入 shape 不同,但模型权重相同。通过 Weight Sharing,两个计算图共享同一份权重,内存占用减半:
Prefill 图(AR=128):每次处理 128 个 token
输入: (1, 128, hidden_size) → 输出: logits + KV Cache
Decode 图(AR=1 或 AR=32):每次处理 1~32 个 token
输入: (1, 1, hidden_size) + KV Cache → 输出: logits + 新 KV
两图的权重完全相同 → 编译为共享权重的二进制包 → 内存减半多模态模型的端侧架构
一个典型的多模态 LLM 在端侧的推理架构:
┌───────────────────────────────────────────────────┐
│ Host CPU │
│ ┌──────────┐ ┌───────────┐ ┌──────────────┐ │
│ │Tokenizer │ │ Processor │ │ KV Cache Mgr │ │
│ │ │ │(Image Pre)│ │ (Mem Alloc) │ │
│ └────┬─────┘ └─────┬─────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────┐ │
│ │ NPU (端侧加速器) │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Vision Encoder (W8A16) │ │ │
│ │ │ Conv2d Patch Embed + ViT Blocks │ │ │
│ │ └──────────────┬───────────────────────┘ │ │
│ │ │ vision_embedding │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Embedding (INT8 LUT) │ │ │
│ │ │ + Vision Token Concat │ │ │
│ │ └──────────────┬───────────────────────┘ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Decoder Layers (W8A16/W4A16) │ │ │
│ │ │ N layers, Eager Attention + Conv2d │ │ │
│ │ └──────────────┬───────────────────────┘ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ LM Head (独立量化) │ │ │
│ │ └──────────────┬───────────────────────┘ │ │
│ │ │ logits │ │
│ └─────────────────┼──────────────────────────┘ │
│ ▼ │
│ ┌────────────────────────────────────┐ │
│ │ Sampling (Top-K / Top-P / Greedy) │ │
│ └────────────────────────────────────┘ │
└───────────────────────────────────────────────────┘端侧 vs 云端推理对比
| 维度 | 云端推理 | 端侧推理 |
|---|---|---|
| 模型规模 | 70B-400B | 0.5B-7B |
| 精度 | FP16/BF16 | W4A16 / W8A16 |
| 吞吐量 | 高(批处理) | 低(单请求) |
| 延迟 | 网络延迟 + 推理 | 仅推理延迟 |
| 隐私 | 数据上传云端 | 数据本地处理 |
| 成本 | GPU 时间费用 | 零边际成本 |
| 模型适配 | 最小改动(PyTorch 原生) | 大量改动(Linear→Conv, Eager Attn) |
| 硬件 | NVIDIA GPU (CUDA) | NPU / ANE / CPU |
| 并发能力 | 高(Continuous Batching) | 低(单用户) |
端云协同
实际产品中,端侧和云端往往不是二选一,而是协同工作:
- 简单请求(短对话、快速问答)→ 端侧处理,零延迟
- 复杂请求(长文档分析、复杂推理)→ 上传云端,大模型处理
- 隐私敏感(医疗、金融)→ 强制端侧
实践要点总结
| 环节 | 关键决策 | 常见陷阱 |
|---|---|---|
| 模型适配 | Linear→Conv2d, Conv3d→Conv2d | 忘记替换某些子模块;下游工具无法识别替换后的层 |
| Attention 改写 | Flash Attention → Eager | 忘记处理 GQA 的 repeat_kv;Softmax 数值不稳定 |
| 量化方案选择 | Vision W8A16, LLM W4A16 | Vision 用 W4 导致图像理解严重退化 |
| SeqMSE | 对 decoder 层启用 | 校准数据太少(< 50 batch)导致效果不稳定 |
| SQNR 验证 | 每步都验证 | 只在最终验证,无法定位精度损失来源 |
| 模型分割 | 按 Embedding / Decoder / LM Head | 分割点选错导致跨图 tensor 传输开销大 |
| Weight Sharing | Prefill 和 Decode 图共享权重 | 未启用 weight sharing,内存占用翻倍 |
| 序列长度 | 固定 context_length(如 4096) | 设太长导致 KV Cache 超出设备内存 |
苏格拉底时刻
在继续之前,尝试回答以下问题来检验你的理解:
1. 为什么 KV Cache 能加速推理?如果没有 KV Cache,生成第 100 个 token 需要多少次矩阵乘法?
没有 KV Cache 时,生成第
2. PagedAttention 借鉴了操作系统的什么概念?为什么能提升显存利用率?
借鉴了虚拟内存中的分页机制(Paging)。通过将 KV Cache 切分为固定大小的 Block 并按需分配,避免了预分配最大长度导致的内部碎片,以及不同请求长度不一导致的外部碎片。显存利用率从 20-40% 提升到接近 100%。
3. Continuous Batching 相比 Static Batching 的核心改进是什么?
核心改进是在 iteration 级别而非 request 级别做调度。短请求完成后立即释放资源并插入新请求,避免 GPU 空等长请求完成。吞吐量提升 2-8 倍。
4. 投机采样为什么是「无损」的?如果小模型猜错了会怎样?
通过精心设计的 accept/reject 机制(以
5. Chunked Prefill 的 Token Budget 是什么意思?为什么 Decode 优先?
Token Budget 是每步模型前向传播的总 token 数上限(如 8192)。Decode 优先是因为正在生成的请求需要持续输出 token 以满足用户实时体验——如果被新的 Prefill 阻塞,用户会感受到卡顿。所以先把所有 Decode 请求放入 batch(每个只占 1 token),再用剩余的 budget 给 Prefill 请求分 chunk。
6. PD 分离的核心挑战是什么?有几种 KV Cache 管理模式?
核心挑战是 Prefill 节点计算完 KV Cache 后需要高效传输到 Decode 节点,传输延迟不能抵消分离带来的收益。三种管理模式:(1) KV Cache 仅存于 Decode 节点 (2) PD 各存各的 (3) 中心化/去中心化 Cache 服务。
7. 在 PagedAttention 的 Decode 阶段,多个 Page 的 Attention 结果如何正确聚合?
通过 Online Softmax 技巧:每个 Page 独立计算局部的 Attention 输出
常见问题 & 面试考点
高频面试题
Q: LLM 推理的 Prefill 和 Decode 阶段有什么区别?
A: Prefill 是处理输入 prompt,一次性计算所有 token 的 KV Cache,是 Compute-bound;Decode 是逐 token 生成,每步读取全部参数和 KV Cache,是 Memory-bound。以 Llama 70B 为例,Decode 阶段算术强度仅为 1 FLOP/Byte,远低于 GPU 的计算带宽比 156 FLOP/Byte。
Q: vLLM 为什么快?核心技术是什么?
A: vLLM 的核心是 PagedAttention(解决 KV Cache 显存碎片,利用率从 ~30% 提升到 ~100%)+ Continuous Batching(迭代级调度,吞吐提升 2-8x)+ Chunked Prefill(避免长 Prefill 阻塞 Decode)。
Q: 投机采样的加速比取决于什么因素?
A: 取决于 Draft Model 和 Target Model 输出分布的一致性(可用 KL 散度度量)。接受率越高加速越大。通常使用同系列的小模型(如 Llama-7B → Llama-70B)可以达到 70-85% 的接受率,实现 2-3x 加速。
Q: Chunked Prefill 和 PD 分离的关系是什么?
A: Chunked Prefill 是 PD 融合方案——在同一 GPU 上交错执行 Prefill 和 Decode。PD 分离则是将二者放在不同物理节点上。实际系统通常兼并两种方案:Prefill 节点内部也可以使用 Chunked Prefill。
性能指标速查
| 指标 | 含义 | 优化技术 |
|---|---|---|
| TTFT (Time To First Token) | 首个 token 延迟 | Chunked Prefill, 模型并行 |
| TPOT (Time Per Output Token) | 每个输出 token 延迟 | KV Cache, 投机采样 |
| Throughput (tokens/s) | 吞吐量 | Continuous Batching, PagedAttention |
| GPU Memory Utilization | 显存利用率 | PagedAttention |
| GPU Compute Utilization | 算力利用率 | Chunked Prefill |
推荐资源
必读论文
- PagedAttention: Efficient Memory Management for Large Language Model Serving with PagedAttention(vLLM 核心论文)
- Speculative Decoding: Fast Inference from Transformers via Speculative Decoding(Google)
- EAGLE-3: EAGLE-3: Scaling up Speculative Sampling with Feature-Level Draft Model(腾讯 AngelSlim)
- SpecExit: SpecExit: Accelerating Large Reasoning Models via Speculative Exit(推理模型思考早退)
- Orca: Orca: A Distributed Serving System for Transformer-Based Generative Models(Continuous Batching 的起源)
- Sarathi-Serve: Taming Throughput-Latency Tradeoff in LLM Inference with Sarathi-Serve(Chunked Prefill)
- DistServe: DistServe: Disaggregating Prefill and Decoding for Goodput-optimized Large Language Model Serving(PD 分离)
代码与工具
- vLLM GitHub — 最流行的开源推理引擎
- TensorRT-LLM — NVIDIA 官方推理优化
- SGLang — 高性能推理框架
- AngelSlim — 腾讯大模型压缩工具包(EAGLE-3 投机采样 + 量化 + SpecExit)
- Qualcomm AI Hub — Qualcomm 端侧模型部署平台,提供预优化模型和部署工具
- MLC LLM — 通用 LLM 端侧推理引擎(支持 iOS/Android/GPU/NPU)
- llama.cpp — 纯 C/C++ 实现的 LLM 推理,极致兼容性