【vllm】(四)vLLM v1 Worker — 模块超深度逐行分析之二
第四章 核心业务逻辑深度解析 - gpu_worker.py4.1 Worker.init() 逐行解析# 第105行: Worker类 - GPU推理Worker的核心实现classWorker(WorkerBase):# 第106行: 构造函数def__init__(self,vllm_config:VllmConfig,local_rank:int,rank:int,distributed_init_method:str,is_driver_worker:boolFalse,return_hidden_states:boolFalse,# 是否返回隐藏状态(用于PP)):# 第119行: 调用父类构造器super().__init__(vllm_config,local_rank,rank,distributed_init_method,is_driver_worker)# 第122-124行: 保存配置引用self.parallel_configvllm_config.parallel_config self.scheduler_configvllm_config.scheduler_config self.device_configvllm_config.device_config# 第127行: 是否返回隐藏状态# Pipeline Parallelism中, 非最后stage需要返回中间隐藏状态self.return_hidden_statesreturn_hidden_states# 第130-132行: 初始化延迟属性# 这些在init_device()或load_model()后才被赋值self.model_runner:GPUModelRunner|NoneNoneself.kv_cache:list[torch.Tensor]|NoneNone# [layer][2][num_blocks, block_size, ...]self.memory_pool_ctx:ContextManager|NoneNone# 第157行: sleep() - 卸载模型权重, 释放GPU内存# level1: 卸载模型权重到CPU# 用于GPU内存紧张时临时释放空间defsleep(self,level:int1)-None:iflevel1:self.model_runner.model.sleep(level)# 将权重移到CPU# 第181行: wake_up() - 重新加载模型权重# tags指定需要唤醒的组件(如[weights, kv_cache])defwake_up(self,tags:list[str]|NoneNone)-None:self.model_runner.model.wake_up(tags)# 将权重移回GPU4.2 Worker.init_device() 逐行解析# 第219行: init_device() - GPU设备初始化# 这是Worker最关键的初始化方法之一definit_device(self)-None:# 第220-227行: 设置CUDA设备torch.set_device(self.device)# self.device torch.device(fcuda:{local_rank})torch.cuda.set_device(self.device)# 第229-238行: 初始化CUDA内存分配器# 设置PYTORCH_CUDA_ALLOC_CONF, 启用expandable_segments# 这允许CUDA内存分配器更高效地管理内存碎片# 第240-255行: 初始化分布式环境# NCCL/TorchDist后端初始化init_worker_distributed_environment(self.vllm_config,self.rank,self.distributed_init_method)# 第257-265行: 设置GPU内存池# 使用CUDAGraph的内存池作为默认内存池# 关键优化: CUDA Graph重放时内存分配来自同一池, 避免重分配self.memory_pool_ctxself._maybe_get_memory_pool_context(default)# 第267-285行: 创建GPUModelRunnerifself.model_runnerisNone:self.model_runnerGPUModelRunner(self.vllm_config,self.device)4.3 Worker.determine_available_memory() 逐行解析# 第332行: 确定可用GPU内存(字节)# 这是KV cache大小的关键决定因素defdetermine_available_memory(self)-int:# 第333-340行: Profile GPU内存使用# 1. 创建虚拟输入, 运行模型一次, 测量峰值内存# 2. 考虑CUDA Graph占用的额外内存# 3. 考虑编码器(多模态)的内存占用# 4. 考虑LoRA的内存占用# 第350-380行: 计算可用内存 总GPU内存 - 模型权重 - 激活值 - CUDA Graph - 编码器torch.cuda.empty_cache()# 先清理缓存torch.cuda.reset_peak_memory_stats()# 重置峰值统计# 运行profile_run测量峰值内存self.model_runner.profile_run()# 计算可用内存available_memorytorch.cuda.mem_get_info()[0]# 空闲内存peak_memorytorch.cuda.max_memory_allocated()# 峰值使用# 减去CUDA Graph额外占用ifcudagraph_mode!CUDAGraphMode.NONE:cudagraph_memoryself.model_runner.profile_cudagraph_memory()available_memory-cudagraph_memoryreturnavailable_memory4.4 Worker.execute_model() 逐行解析# 第751行: 推理执行主入口defexecute_model(self,scheduler_output:SchedulerOutput,*,output_proc:boolFalse,)-ModelRunnerOutput|None:# 第753-760行: 只在driver worker执行调度逻辑# 非driver worker只接收广播的输入ifnotself.is_driver_worker:returnNone# 第762-770行: 应用多模态缓存# 避免重复编码相同的多模态输入self._apply_mm_cache(scheduler_output)# 第772-800行: 执行模型推理outputself.model_runner.execute_model(scheduler_outputscheduler_output,)# 第802-820行: Pipeline Parallelism处理# 非最后stage需要广播中间结果ifself.return_hidden_states:# 广播sampled_token_ids到下一stageself._pp_broadcast_prev_sampled_token_ids(output.sampled_token_ids)# 第822-840行: 异步输出处理# 使用单独的CUDA stream异步拷贝输出到CPUifoutput_procandisinstance(output,AsyncGPUModelRunnerOutput):returnoutput# 返回异步结果, 由调用方等待returnoutput第五章 核心业务逻辑深度解析 - gpu_model_runner.py这是整个worker模块最核心, 最大的文件(6,996行). 以下逐行解析其关键方法.5.1 GPUModelRunner.init() 逐行解析# 第394行: GPUModelRunner定义classGPUModelRunner:# 第397行: 构造函数 - 初始化所有推理所需的组件def__init__(self,vllm_config:VllmConfig,device:torch.device,):# 第400-430行: 保存配置引用self.vllm_configvllm_config self.model_configvllm_config.model_config self.cache_configvllm_config.cache_config self.parallel_configvllm_config.parallel_config self.scheduler_configvllm_config.scheduler_config self.speculative_configvllm_config.speculative_config self.lora_configvllm_config.lora_config# 第432-445行: 核心参数self.max_num_reqsself.scheduler_config.max_num_seqs self.max_num_tokensself.scheduler_config.max_num_batched_tokens self.max_model_lenself.model_config.max_model_len self.block_sizeself.cache_config.block_size self.num_kv_heads...# 从模型配置推导self.head_size...self.vocab_sizeself.model_config.get_vocab_size()# 第450-470行: 创建InputBatch# InputBatch管理持久化批的所有GPU/CPU缓冲区self.input_batchInputBatch(max_num_reqsself.max_num_reqs,max_model_lenself.max_model_len,max_num_tokensself.max_num_tokens,devicedevice,pin_memoryis_pin_memory_available(),vocab_sizeself.vocab_size,)# 第475-490行: 创建采样器self.samplerSampler(logprobs_modeself.cache_config.logprobs_mode,)# 第495-510行: 创建KV连接器(如果启用)ifhas_kv_transfer_group():self.kv_connectorget_kv_connector(self.vllm_config,device,self.input_batch)else:self.kv_connectorKVConnector()# no-op连接器# 第515-530行: 创建微批处理包装器ifself.scheduler_config.is_ubatch_enabled:self.ubatch_wrapperUBatchWrapper(self.vllm_config,self.model,device)else:self.ubatch_wrapperNone# 第535-550行: 创建CUDA Graph管理器self.cudagraph_managerModelCudaGraphManager(self.vllm_config,self.model,device)# 第555-570行: 创建KV Block零化器# 用于在释放KV cache block时将其清零self.kv_block_zeroerKVBlockZeroer(device,is_pin_memory_available())# 第575-590行: 创建工作区管理器init_workspace_manager(device,self.scheduler_config.num_ubatches)# 第595-610行: 创建多模态组件self.encoder_cacheEncoderCache(...)# 编码器输出缓存self.encoder_runnerEncoderRunner(...)# 编码器执行器self.encoder_cudagraph_managerEncoderCudaGraphManager(...)# 编码器CUDA Graph5.2 _update_states() 逐行解析# 第1059行: _update_states - 批状态更新的核心方法# 将SchedulerOutput转化为InputBatch的增量更新# 返回一个可选的回调函数(用于模型执行后的状态修正)def_update_states(self,scheduler_output:SchedulerOutput,)-Callable|None:# 阶段1: 移除已完成请求 # 第1062-1080行: 遍历finished_req_idsforreq_idinscheduler_output.finished_req_ids:self.input_batch.remove_request(req_id)self.encoder_cache.free(req_id)# 释放编码器缓存# 零化已释放的KV cache blocksfreed_block_idsscheduler_output.freed_block_ids.get(req_id,[])self._zero_block_ids(freed_block_ids)# 阶段2: 添加新请求 # 第1082-1110行: 遍历scheduled_new_reqsfornew_reqinscheduler_output.scheduled_new_reqs:# 创建CachedRequestState(不可变的请求级缓存)req_stateCachedRequestState(req_idnew_req.req_id,prompt_token_idsnew_req.prompt_token_ids,sampling_paramsnew_req.sampling_params,...)# 添加到InputBatch, 分配slot索引self.input_batch.add_request(req_state)# 阶段3: 交换请求位置 # 第1112-1140行: 处理batch reordering# 为了优化CUDA Graph的利用率, 可能需要重排请求fori1,i2inscheduler_output.swap_pairs:self.input_batch.swap_states(i1,i2)self.block_table.swap_row(i1,i2)# 阶段4: 更新已有请求 # 第1142-1200行: 更新正在生成的请求的状态forcached_reqinscheduler_output.scheduled_cached_reqs:req_indexself.input_batch.get_req_index(cached_req.req_id)# 更新新token的block映射# 更新sampling_params(可能动态变化)# 更新spec_token_ids# 阶段5: 更新KV Block IDs # 第1202-1280行: 将新分配的block_ids更新到BlockTableforreq_id,new_block_idsinscheduler_output.block_ids.items():req_indexself.input_batch.get_req_index(req_id)forgid,block_idsinenumerate(new_block_ids):self.block_table.append_row(block_ids,req_index)# 阶段6: 投机解码token更新 # 第1282-1320行: 如果使用投机解码, 更新draft token信息ifscheduler_output.scheduled_spec_decode:forreq_id,spec_tokensinscheduler_output.scheduled_spec_decode.items():self.input_batch.update_req_spec_token_ids(req_id,spec_tokens)# 第1322-1340行: 压缩批(移除空位, 保持连续)self.input_batch.condense()# 第1342-1360行: 返回模型执行后的修正回调# 用于投机解码中修正draft token数量defcorrect_spec_decode_token_counts():...returncorrect_spec_decode_token_countsifhas_spec_decodeelseNone5.3 _prepare_inputs() 逐行解析# 第1774行: _prepare_inputs - 准备模型前向传播的所有输入def_prepare_inputs(self,scheduler_output:SchedulerOutput,)-dict[str,Any]:# 第1776-1790行: 提取调度信息num_scheduled_tokensscheduler_output.num_scheduled_tokens num_reqsself.input_batch.num_reqs# 第1792-1820行: 准备input_ids# 从SchedulerOutput中提取每请求的token IDs# prefill: prompt tokens 新tokens# decode: 新生成的单个tokeninput_idsself._prepare_input_ids(scheduler_output)# 第1822-1850行: 准备position IDs# 对于RoPE位置编码, 需要为每个token计算其位置# 对于MRoPE(多模态RoPE), 需要额外的位置计算positionsself._get_positions(num_scheduled_tokens)# 第1852-1900行: 构建注意力元数据# 这是最复杂的部分, 涉及Block Table, slot mapping等attn_metadataself._build_attention_metadata(scheduler_output)# 第1902-1930行: 准备多模态输入# 如果有图像/音频/视频输入, 需要准备对应的embeddingmm_kwargsself._extract_mm_kwargs(scheduler_output)# 第1932-1960行: 准备Mamba状态(如果使用Mamba模型)# Mamba模型需要拷贝状态到当前位置ifself.uses_mamba:mamba_paramspreprocess_mamba(self.input_batch,scheduler_output)# 第1962-1990行: 组装model_kwargs# 将所有输入组装为字典, 传递给model()model_kwargs{input_ids:input_ids,positions:positions,kv_caches:self.kv_cache,attn_metadata:attn_metadata,**mm_kwargs,**mamba_params,}returnmodel_kwargs5.4 execute_model() 逐行解析# 第3771行: execute_model - 完整推理流水线defexecute_model(self,scheduler_output:SchedulerOutput,*,intermediate_tensors:IntermediateTensors|NoneNone,num_steps:int1,)-ModelRunnerOutput|None:# Step 0: KV Connector no_forward检查 # 第3773-3800行: 如果所有KV都从远程加载, 跳过模型执行ifself.kv_connectorandself.kv_connector.can_skip_forward(scheduler_output):returnself.kv_connector.no_forward(scheduler_output)# Step 1: 更新批状态 # 第3802-3810行post_model_execute_cbself._update_states(scheduler_output)# Step 2: 准备输入 # 第3812-3820行model_kwargsself._prepare_inputs(scheduler_output)# Step 3: 多模态编码器 # 第3822-3850行: 处理图像/音频/视频ifscheduler_output.scheduled_new_reqsorscheduler_output.scheduled_cached_reqs:ifhas_mm_inputs:self._execute_mm_encoder(scheduler_output)# Step 4: KV Connector预加载 # 第3852-3870行ifself.kv_connector:self.kv_connector.pre_forward(scheduler_output)# Step 5: 预处理 # 第3872-3900行: 最终的输入组装# 包括input_ids, positions, block_tables, slot_mappings等self._preprocess(scheduler_output,model_kwargs)# Step 6: 确定批执行模式 # 第3902-3940行: 判断是否可以走CUDA Graph# uniform_decode: 所有请求都是decode且token数相同(is_ubatch,cudagraph_mode,num_tokens_to_pad)\ self._determine_batch_execution_and_padding(scheduler_output)# Step 7: 模型前向传播 # 第3942-4000行ifis_ubatch:# 微批次路径: 拆分为多个ubatch, 重叠计算和通信hidden_statesself.ubatch_wrapper(model_kwargs,cudagraph_mode,num_tokens_to_pad)elifcudagraph_mode!CUDAGraphMode.NONE:# CUDA Graph路径: 重放预捕获的图hidden_statesself.cudagraph_manager.run_fullgraph(model_kwargs,cudagraph_mode,num_tokens_to_pad)else:# 普通路径: 直接调用model()hidden_statesself._model_forward(model_kwargs)# Step 8: KV Connector后处理 # 第4002-4020行ifself.kv_connector:self.kv_connector.post_forward(scheduler_output,hidden_states)# Step 9: 采样 # 第4022-4060行ifnotself.model_config.is_embedding_model:sampler_outputself._sample(hidden_states,scheduler_output)# Step 10: 后记账和异步同步 # 第4062-4100行outputself._bookkeeping_sync(scheduler_output,sampler_output)# Step 11: 执行后回调 # 第4102-4120行: 修正投机解码的token计数ifpost_model_execute_cb:post_model_execute_cb()returnoutput5.5 _build_attention_metadata() 逐行解析# 第2085行: 构建注意力元数据# 这是最复杂的方法之一, 涉及多种注意力后端的元数据构建def_build_attention_metadata(self,scheduler_output:SchedulerOutput,)-AttentionMetadata:# 第2087-2110行: 提取基本信息num_prefillsscheduler_output.num_prefills num_decode_tokensscheduler_output.num_decode_tokens num_prefill_tokensscheduler_output.num_prefill_tokens# 第2112-2150行: 构建Block Table# 对每个KV cache group, 计算GPU端的block tabledef_get_block_table(kv_cache_gid:int):# 从CPU端的BlockTable获取block_ids# 转换为GPU端的block_table_tensorblock_tableself.block_tables[kv_cache_gid]returnblock_table.get_device_tensor(num_reqs)# 第2152-2200行: 计算slot_mapping# slot_mapping: 每个token在KV cache中的写入位置# block_id * block_size offset_in_blockslot_mappingself._get_slot_mappings(scheduler_output)# 第2202-2270行: 构建seq_lens和query_lens# seq_lens: 每个请求的总序列长度(prompt 已生成)# query_lens: 本次调度中每个请求的新token数# prefill: query_len num_prompt_tokens# decode: query_len 1# 第2272-2310行: 构建注意力组元数据# 支持多种注意力后端(FlashInfer, Triton, etc.)def_build_attn_group_metadata(attn_group,ubatch_id):builderself.metadata_builders[ubatch_id]# 调用builder.build()构建后端特定的元数据returnbuilder.build(num_prefillsnum_prefills,num_prefill_tokensnum_prefill_tokens,num_decode_tokensnum_decode_tokens,seq_lensseq_lens,query_lensquery_lens,block_tableblock_table,slot_mappingslot_mapping,)# 第2312-2340行: 计算级联注意力前缀长度# 级联注意力: 将长序列的prefix和suffix分开处理# 可以复用prefix的KV cache, 减少重复计算ifself.scheduler_config.enable_cascade_attn:cascade_prefix_lensself._compute_cascade_attn_prefix_lens(scheduler_output)# 第2342-2380行: 组装AttentionMetadatareturnAttentionMetadata(num_prefillsnum_prefills,num_prefill_tokensnum_prefill_tokens,num_decode_tokensnum_decode_tokens,seq_lensseq_lens,query_lensquery_lens,block_tableblock_table,slot_mappingslot_mapping,cascade_attn_prefix_lenscascade_prefix_lens,**attn_group_metadata,)5.6 _determine_batch_execution_and_padding() 逐行解析# 第3536行: 确定批执行模式和padding# 这是CUDA Graph利用率的关键决策点def_determine_batch_execution_and_padding(self,scheduler_output:SchedulerOutput,)-tuple[bool,CUDAGraphMode,int]:# 第3538-3560行: 判断是否均匀解码# 均匀解码 所有请求都是decode 每请求1个token# 这种情况下token数 请求数, 可以完美走CUDA Graphis_uniformself._is_uniform_decode(scheduler_output)ifis_uniform:# 均匀解码路径num_tokensself.input_batch.num_reqs# 查找匹配的CUDA Graphcudagraph_modeself.cudagraph_manager.dispatch(num_tokens)# 计算padding到graph的大小的token数num_tokens_to_padcudagraph_mode.padding-num_tokensreturnFalse,cudagraph_mode,num_tokens_to_pad# 第3562-3580行: 非均匀路径# 如果启用了ubatch, 拆分为多个ubatchifself.scheduler_config.is_ubatch_enabled:returnTrue,CUDAGraphMode.NONE,0# 普通路径: 直接执行, 无CUDA GraphreturnFalse,CUDAGraphMode.NONE,0