【Nano-vLLM源码分析(一)】环境配置及整体流程概览
环境配置
整体环境还是很干净的,跟着readme应该能很快配置起来。
不过我这里是下载了源代码,然后使用了容器
nvcr.io/nvidia/pytorch:25.04-py3来挂载文件夹运行,在容器内还需要pip3 install transformers xxhash,这样就配置好了基本的python环境。然后还需要下载Qwen3模型,因为Nano-vLLM目前只专门适配了它。这里readme使用的是比较老的
huggingface-cli来下载,如果使用最新的版本,直接使用如下的代码即可将模型下载好,注意这里下载的位置是./huggingface/Qwen3-0.6B。
1 | |
- 然后修改
example.py中的模型位置为./huggingface/Qwen3-0.6B再直接运行python3 example.py就可以运行了。运行结果如下:
1 | |
- 注意到在运行过程中由于其使用的是GPU 0,故会分配GPU 0的大量显存,主要占用显存的地方是模型以及kv cache

流程概览
项目文件
首先整体概览一下项目的文件结构,如下所示,添加了一些介绍
1 | |
整体流程
example.py的代码如下所示
1 | |
现在跟着example的执行过程大致梳理一下整体流程:
根据
Qwen3-0.6B文件夹内容初始化Tokenizer以及LLM模型LLM是对LLMEngine的包装,主要是为了对齐vLLM的行为,其代码如下所示
1
2
3
4
5from nanovllm.engine.llm_engine import LLMEngine
class LLM(LLMEngine):
pass- LLMEngine初始化的代码如下所示,其读取了配置文件,然后其支持TP并行,会加载TP个进程,每个进程运行对应的
ModelRunner。其还配置了对应的tokenizer与scheduler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20class LLMEngine:
def __init__(self, model, **kwargs):
config_fields = {field.name for field in fields(Config)}
config_kwargs = {k: v for k, v in kwargs.items() if k in config_fields}
config = Config(model, **config_kwargs)
self.ps = []
self.events = []
ctx = mp.get_context("spawn")
for i in range(1, config.tensor_parallel_size):
event = ctx.Event()
process = ctx.Process(target=ModelRunner, args=(config, i, event))
process.start()
self.ps.append(process)
self.events.append(event)
self.model_runner = ModelRunner(config, 0, self.events)
self.tokenizer = AutoTokenizer.from_pretrained(config.model, use_fast=True)
config.eos = self.tokenizer.eos_token_id
self.scheduler = Scheduler(config)
atexit.register(self.exit)配置sampling_params
- 其代码如下所示:
1
2
3
4
5
6
7
8
9@dataclass
class SamplingParams:
temperature: float = 1.0
max_tokens: int = 64
ignore_eos: bool = False
def __post_init__(self):
assert self.temperature > 1e-10, "greedy sampling is not permitted"主要是支持配置:
temperature: $$p_i = \mathrm{softmax}!\left(\frac{z_i}{T}\right)$$,T负责控制缩放程度,T小于1.0分布会更尖锐,大于1.0分布会更加平缓。其通过assert self.temperature > 1e-10避免了完全确定性的输出max_tokens:最多生成的token数量ignore_eos:如果为true,即使生成 EOS,也继续生成直到达到max_tokens
使用tokenizer模板转换prompt
- 经过chat模板转换后,得到的
prompts结果如下
1
['<|im_start|>user\nintroduce yourself<|im_end|>\n<|im_start|>assistant\n', '<|im_start|>user\nlist all prime numbers within 100<|im_end|>\n<|im_start|>assistant\n']- 经过chat模板转换后,得到的
通过
llm.generate(prompts, sampling_params)调用得到生成结果generate的相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36def generate(
self,
prompts: list[str] | list[list[int]],
sampling_params: SamplingParams | list[SamplingParams],
use_tqdm: bool = True,
) -> list[str]:
if use_tqdm:
pbar = tqdm(total=len(prompts), desc="Generating", dynamic_ncols=True)
if not isinstance(sampling_params, list):
sampling_params = [sampling_params] * len(prompts)
for prompt, sp in zip(prompts, sampling_params):
self.add_request(prompt, sp)
outputs = {}
prefill_throughput = decode_throughput = 0.
while not self.is_finished():
t = perf_counter()
output, num_tokens = self.step()
if use_tqdm:
if num_tokens > 0:
prefill_throughput = num_tokens / (perf_counter() - t)
else:
decode_throughput = -num_tokens / (perf_counter() - t)
pbar.set_postfix({
"Prefill": f"{int(prefill_throughput)}tok/s",
"Decode": f"{int(decode_throughput)}tok/s",
})
for seq_id, token_ids in output:
outputs[seq_id] = token_ids
if use_tqdm:
pbar.update(1)
outputs = [outputs[seq_id] for seq_id in sorted(outputs.keys())]
outputs = [{"text": self.tokenizer.decode(token_ids), "token_ids": token_ids} for token_ids in outputs]
if use_tqdm:
pbar.close()
return outputs首先会将各prompt与sampling_param组合为
Sequence然后加入到scheduler中的waiting队列中add_request的代码如下所示
1
2
3
4
5def add_request(self, prompt: str | list[int], sampling_params: SamplingParams):
if isinstance(prompt, str):
prompt = self.tokenizer.encode(prompt)
seq = Sequence(prompt, sampling_params)
self.scheduler.add(seq)其主要会将prompt转换为token id列表,如这里的
'<|im_start|>user\nintroduce yourself<|im_end|>\n<|im_start|>assistant\n'会被转化为[151644, 872, 198, 396, 47845, 6133, 151645, 198, 151644, 77091, 198]然后scheduler.add的代码如下所示,就是直接append到waiting队列中
1
2def add(self, seq: Sequence):
self.waiting.append(seq)然后进入while循环,循环结束的条件是
self.is_finished()- LLM_Engine的
is_finished()的代码如下所示:
1
2def is_finished(self):
return self.scheduler.is_finished()Scheduler的is_finished代码如下所示,即介绍的标准就是两个队列中都没有请求
1
2def is_finished(self):
return not self.waiting and not self.running- LLM_Engine的
在while循环中主要的就是不断执行
self.step(),然后更新进度条,根据perf_counter()得到的运算时间以及生成的token数量num_tokens计算吞吐并将输出结果存储在outputs中self.step()的代码如下所示
1
2
3
4
5
6
7def step(self):
seqs, is_prefill = self.scheduler.schedule()
token_ids = self.model_runner.call("run", seqs, is_prefill)
self.scheduler.postprocess(seqs, token_ids)
outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
num_tokens = sum(len(seq) for seq in seqs) if is_prefill else -len(seqs)
return outputs, num_tokens其主要就是从scheduler中获取到要运行的seqs
scheduler.schedule()的代码如下所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36def schedule(self) -> tuple[list[Sequence], bool]:
# prefill
scheduled_seqs = []
num_seqs = 0
num_batched_tokens = 0
while self.waiting and num_seqs < self.max_num_seqs:
seq = self.waiting[0]
if num_batched_tokens + len(seq) > self.max_num_batched_tokens or not self.block_manager.can_allocate(seq):
break
num_seqs += 1
self.block_manager.allocate(seq)
num_batched_tokens += len(seq) - seq.num_cached_tokens
seq.status = SequenceStatus.RUNNING
self.waiting.popleft()
self.running.append(seq)
scheduled_seqs.append(seq)
if scheduled_seqs:
return scheduled_seqs, True
# decode
while self.running and num_seqs < self.max_num_seqs:
seq = self.running.popleft()
while not self.block_manager.can_append(seq):
if self.running:
self.preempt(self.running.pop())
else:
self.preempt(seq)
break
else:
num_seqs += 1
self.block_manager.may_append(seq)
scheduled_seqs.append(seq)
assert scheduled_seqs
self.running.extendleft(reversed(scheduled_seqs))
return scheduled_seqs, False其划分为了两大阶段的处理,首先是对多个请求进行
prefill处理:进入
prefill的while循环的要求是waiting队列中有请求,并且现在获取的seq数量没有超过max_num_seqs限制。此外还要求新获取的seq叠加的num_batched_tokens要小于self.max_num_batched_tokens,并且block_manager还有足够的显存进行分配然后正常在while循环中处理的主要流程就是
block_manager为新的seq分配cache,seq状态修改为running,num_batched_tokens添加len(seq) - seq.num_cached_tokens,waiting队列popleft,running队列append,scheduled_seqs结果append如果最后
scheduled_seqs确实获取了seq,就直接返回scheduled_seqs及True标明这一次调度只处理了prefill
如果没有prefill请求了,就去处理
decode:进入
decode的while循环要求running队列中有请求,并且现在获取的seq数量没有超过max_num_seqs限制在循环中需要从
running队列中popleft获取seq,然后通过block_manager查看是否还有足够的显存进行分配,如果没有就需要去抢占其他seq,优先抢占running队列最新来的seq,如果都抢占完了再抢占自己,不过如果抢占了自己就直接退出while循环了,因为确实没有显存了。如果有足够的显存或者抢占得到了足够的显存,那么就调用block_manager进行may_append分配显存,并且结果队列scheduled_seqsappend该seq最后返回结果队列
scheduled_seqs及False标明是decode处理
然后调用
model_runner运行模型推理- 这里调用的是
model_runner的run函数,如下所示
1
2
3
4
5
6
7def run(self, seqs: list[Sequence], is_prefill: bool) -> list[int]:
input_ids, positions = self.prepare_prefill(seqs) if is_prefill else self.prepare_decode(seqs)
temperatures = self.prepare_sample(seqs) if self.rank == 0 else None
logits = self.run_model(input_ids, positions, is_prefill)
token_ids = self.sampler(logits, temperatures).tolist() if self.rank == 0 else None
reset_context()
return token_ids其主要流程是依据是否是prefill来做一些kv cache的准备工作
然后运行模型得到运行结果
最后采样得到
token_ids并返回
- 这里调用的是
然后执行scheduler的一些后处理流程
postprocess的相关代码如下所示,
1
2
3
4
5
6
7def postprocess(self, seqs: list[Sequence], token_ids: list[int]) -> list[bool]:
for seq, token_id in zip(seqs, token_ids):
seq.append_token(token_id)
if (not seq.ignore_eos and token_id == self.eos) or seq.num_completion_tokens == seq.max_tokens:
seq.status = SequenceStatus.FINISHED
self.block_manager.deallocate(seq)
self.running.remove(seq)- 其主要是逐个遍历
token_ids,并将推理结果token_id添加到seq中,然后检查如果没有忽略eos而当前就是eos或者已经到达最大长度了,就将seq状态修改为FINISHED,然后block_manager释放掉这个seq对应的显存,并且也将其从running队列中删除
如果seq是已经推理结束了是
FINISHED状态就将其记录到outputs中,并计算中共处理了多少tokens,然后返回
最后将
outputs使用tokenizer.decode解码得到text,并将token_id与text都返回
打印生成结果