【llama.cpp】qwen2_vl_surgery.py详解
目录
■官方代码
■详解
qwen2_vl_surgery.py
■官方代码
# 克隆llama.cppgit clone https://github.com/HimariO/llama.cpp.qwen2.5vl.gitcd llama.cpp.qwen2.5vlgit checkout qwen25-vl-20250404
■详解
qwen2_vl_surgery.py
这段代码用于将 Qwen2-VL 模型的视觉部分转换为 GGUF 格式,支持 fp32/fp16 精度。主要功能包括:
- 加载模型和配置;
- 提取视觉模块权重并重命名;
- 将权重写入 GGUF 文件,供推理使用。
import argparsefrom typing import Dictimport torchimport numpy as npfrom gguf import *from transformers import ( AutoProcessor, Qwen2VLConfig, Qwen2VLProcessor, Qwen2VLForConditionalGeneration, Qwen2_5_VLConfig, # type: ignore[reportAttributeAccessIssue] Qwen2_5_VLForConditionalGeneration, # type: ignore[reportAttributeAccessIssue])VISION = \"clip.vision\"def k(raw_key: str, arch: str) -> str: \"\"\" 格式化键名字符串,将架构信息插入到键名模板中 参数: raw_key (str): 包含架构占位符的键名模板字符串,使用 {arch} 作为占位符 arch (str): 架构名称,用于替换模板中的占位符 返回: str: 格式化后的键名字符串,其中 {arch} 占位符被实际的架构名称替换 \"\"\" return raw_key.format(arch=arch)class VL2: \"\"\" VL2 类用于处理视觉-语言模型中的张量名称转换和提取视觉模块的权重。 \"\"\" @staticmethod def to_gguf_name(name: str) -> str: \"\"\" 将原始模型中的参数名称转换为 GGUF 格式兼容的名称。 参数: name (str): 原始模型中的参数名称。 返回: str: 转换后的 GGUF 兼容名称。 \"\"\" og = name name = name.replace(\"text_model\", \"t\").replace(\"vision_model\", \"v\") name = name.replace(\"blocks\", \"blk\").replace(\"embeddings.\", \"\") name = name.replace(\"attn.\", \"attn_\") name = name.replace(\"mlp.fc1\", \"ffn_down\").replace(\"mlp.fc2\", \"ffn_up\").replace(\"proj.\", \"out.\") # name = name.replace(\"layrnorm\", \"ln\").replace(\"layer_norm\", \"ln\").replace(\"layernorm\", \"ln\") name = name.replace(\"norm1\", \"ln1\").replace(\"norm2\", \"ln2\") name = name.replace(\"merger.mlp\", \'mm\') print(f\"[to_gguf_name] {og} --> {name}\") return name @classmethod def find_vision_tensors(cls, qwen2vl, dtype) -> Dict[str, np.ndarray]: \"\"\" 提取视觉模型中的所有张量,并根据需要进行重命名和拆分。 参数: qwen2vl: 包含视觉模型的完整模型对象。 dtype: 目标数据类型,用于非归一化层权重的转换。 返回: Dict[str, np.ndarray]: 一个字典,键是转换后的张量名称,值是对应的 NumPy 数组。 \"\"\" vision_model = qwen2vl.visual tensor_map = {} # 遍历视觉模型的所有状态字典项 for name, ten in vision_model.state_dict().items(): ten = ten.numpy() # 处理 QKV 合并的线性层(注意力机制中常见的合并查询、键、值) if \'qkv\' in name: if ten.ndim == 2: # 权重矩阵 c3, _ = ten.shape else: # 偏置向量 c3 = ten.shape[0] assert c3 % 3 == 0 c = c3 // 3 wq = ten[:c] wk = ten[c: c * 2] wv = ten[c * 2:] base_name = f\"vision_model.{name}\" # 分别保存 Q、K、V 的权重 tensor_map[cls.to_gguf_name(base_name).replace(\"qkv\", \"q\")] = wq tensor_map[cls.to_gguf_name(base_name).replace(\"qkv\", \"k\")] = wk tensor_map[cls.to_gguf_name(base_name).replace(\"qkv\", \"v\")] = wv # 处理 merger 模块中的 MLP 和 LayerNorm 层 elif \'merger\' in name: if name.endswith(\"ln_q.weight\"): tensor_map[\'v.post_ln.weight\'] = ten elif name.endswith(\"ln_q.bias\"): tensor_map[\'v.post_ln.bias\'] = ten else: # \"merger.mlp.%d.weight/bias\" --> \"mm.%d.weight/bias\" tensor_map[cls.to_gguf_name(name)] = ten # 特殊处理 patch embedding 中的 3D 卷积核,将其拆分为两个 2D 卷积核 elif \'patch_embed.proj.weight\' in name: # NOTE: split Conv3D into Conv2Ds # 从输入张量中提取时空patch嵌入权重 # 该函数假设输入张量包含两个时间步的patch嵌入信息 # # 参数: # ten: 输入张量,形状为(c1, c2, kt, kh, kw) # c1, c2: 通道维度 # kt: 时间维度,当前实现要求必须为2 # kh, kw: 空间维度(高度和宽度) # tensor_map: 字典,用于存储提取的权重张量 # # 重要假设: # - 时间patch大小必须为2,这是当前实现的限制 # - 输入张量的前两个维度表示通道信息 # - 第三个维度表示时间步,只处理两个时间步的情况 # # 处理逻辑: # - 将输入张量在时间维度上分离 # - 第0个时间步的权重存储为\"v.patch_embd.weight\" # - 第1个时间步的权重存储为\"v.patch_embd.weight.1\" c1, c2, kt, kh, kw = ten.shape assert kt == 2, \"Current implmentation only support temporal_patch_size of 2\" tensor_map[\"v.patch_embd.weight\"] = ten[:, :, 0, ...] tensor_map[\"v.patch_embd.weight.1\"] = ten[:, :, 1, ...] # 其他常规张量直接映射并重命名 else: tensor_map[cls.to_gguf_name(f\"vision_model.{name}\")] = ten # 根据张量维度决定是否转换为指定的数据类型 for new_name, ten in tensor_map.items(): if ten.ndim str: \"\"\" 将模型层名称转换为GGUF格式的名称。 该函数通过一系列字符串替换操作,将原始模型名称中的特定关键词 替换为GGUF格式约定的缩写形式。 参数: name (str): 原始模型层名称 返回: str: 转换后的GGUF格式名称 \"\"\" og = name # 替换模型类型相关关键词 name = name.replace(\"text_model\", \"t\").replace(\"vision_model\", \"v\") # 替换结构相关关键词 name = name.replace(\"blocks\", \"blk\").replace(\"embeddings.\", \"\") # 替换注意力机制相关关键词 name = name.replace(\"attn.\", \"attn_\") # 替换MLP层相关关键词 name = name.replace(\"mlp.down_proj\", \"ffn_down\").replace(\"mlp.up_proj\", \"ffn_up\") name = name.replace(\"mlp.gate_proj\", \"ffn_gate\").replace(\"proj.\", \"out.\") # 替换归一化层相关关键词 name = name.replace(\"norm1\", \"ln1\").replace(\"norm2\", \"ln2\") # 替换融合模块相关关键词 name = name.replace(\"merger.mlp\", \'mm\') print(f\"[vl25][to_gguf_name] {og} --> {name}\") return namedef main(args): \"\"\" 主函数,用于将 Qwen2VL 或 Qwen2.5VL 模型的视觉编码器部分导出为 GGUF 格式。 参数: args: 命令行参数对象,包含以下属性: - data_type (str): 数据类型,支持 \'fp32\' 或 \'fp16\'。 - model_name (str): 模型名称或本地路径。 - model_type (str): 模型类型,支持 \"qwen2vl\" 或 \"qwen2.5vl\"。 返回值: 无返回值。输出为一个以 `-vision.gguf` 结尾的 GGUF 文件。 \"\"\" # 根据指定的数据类型设置 PyTorch 和 NumPy 的数据类型以及 GGUF 文件类型标识 if args.data_type == \'fp32\': dtype = torch.float32 np_dtype = np.float32 ftype = 0 elif args.data_type == \'fp16\': dtype = torch.float16 np_dtype = np.float16 ftype = 1 else: raise ValueError() local_model = False model_path = \"\" model_name = args.model_name print(\"model_name: \", model_name) # 加载对应类型的模型并获取其配置信息和视觉配置 if args.model_type == \"qwen2vl\": qwen2vl = Qwen2VLForConditionalGeneration.from_pretrained( model_name, torch_dtype=dtype, device_map=\"cpu\" ) cfg: Qwen2VLConfig = qwen2vl.config # type: ignore[reportAssignmentType] vcfg = cfg.vision_config else: qwen2vl = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_name, torch_dtype=dtype, device_map=\"cpu\" ) cfg: Qwen2_5_VLConfig = qwen2vl.config # type: ignore[reportAssignmentType] vcfg = cfg.vision_config # 判断模型是否来自本地路径,并处理路径和模型名 if os.path.isdir(model_name): local_model = True if model_name.endswith(os.sep): model_name = model_name[:-1] model_path = model_name model_name = os.path.basename(model_name) # 设置输出文件名 fname_out = f\"{model_name.replace(\'/\', \'-\').lower()}-vision.gguf\" # 初始化 GGUF 写入器并添加基本元数据 fout = GGUFWriter(path=fname_out, arch=\"clip\") fout.add_description(\"image encoder for Qwen2VL\") fout.add_file_type(ftype) fout.add_bool(\"clip.has_text_encoder\", False) fout.add_bool(\"clip.has_vision_encoder\", True) fout.add_bool(\"clip.has_qwen2vl_merger\", True) fout.add_string(\"clip.projector_type\", \"qwen2vl_merger\") # 根据激活函数类型设置相应的布尔标志 print(cfg.vision_config) if \'silu\' in cfg.vision_config.hidden_act.lower(): fout.add_bool(\"clip.use_silu\", True) fout.add_bool(\"clip.use_gelu\", False) elif \'gelu\' in cfg.vision_config.hidden_act.lower(): fout.add_bool(\"clip.use_silu\", False) fout.add_bool(\"clip.use_gelu\", \'quick\' not in cfg.vision_config.hidden_act.lower()) else: raise ValueError() # 根据模型类型添加特定的视觉模型参数 if args.model_type == \"qwen2.5vl\": fout.add_bool(\"clip.use_glu_mlp\", True) # gate linear unit MLP layer in vision model fout.add_bool(\"clip.use_rms_norm\", True) fout.add_array(\"clip.vision.fullatt_block_indexes\", vcfg.fullatt_block_indexes) fout.add_uint32(\"clip.vision.window_size\", vcfg.window_size) fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.hidden_size) fout.add_uint32(\"clip.vision.projection_dim\", vcfg.out_hidden_size) else: fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.embed_dim) fout.add_uint32(\"clip.vision.projection_dim\", vcfg.hidden_size) # 获取模型中的视觉相关张量并写入 GGUF 文件 if args.model_type == \"qwen2.5vl\": tensor_map = VL25.find_vision_tensors(qwen2vl, np_dtype) else: tensor_map = VL2.find_vision_tensors(qwen2vl, np_dtype) for name, data in tensor_map.items(): fout.add_tensor(name, data) # 添加视觉模型的基本结构参数 fout.add_uint32(\"clip.vision.patch_size\", vcfg.patch_size) fout.add_uint32(\"clip.vision.image_size\", 14 * 40) # some reasonable size that is divable by (14*2) fout.add_uint32(k(KEY_ATTENTION_HEAD_COUNT, VISION), vcfg.num_heads) fout.add_float32(k(KEY_ATTENTION_LAYERNORM_EPS, VISION), 1e-6) fout.add_uint32(k(KEY_BLOCK_COUNT, VISION), vcfg.depth) fout.add_uint32(k(KEY_FEED_FORWARD_LENGTH, VISION), 0) # not sure what this does, put 0 here as a placeholder fout.add_name(model_name) \"\"\" HACK: Since vision rope related parameter aren\'t stored in the `Qwen2VLConfig, it will be hardcoded in the `clip_image_build_graph` from `clip.cpp`. \"\"\" # 加载处理器以获取图像预处理参数(均值和标准差) if local_model: processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_path) else: processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_name) fout.add_array(\"clip.vision.image_mean\", processor.image_processor.image_mean) # type: ignore[reportAttributeAccessIssue] fout.add_array(\"clip.vision.image_std\", processor.image_processor.image_std) # type: ignore[reportAttributeAccessIssue] # 将所有数据写入文件并关闭写入器 fout.write_header_to_file() fout.write_kv_data_to_file() fout.write_tensors_to_file() fout.close() print(\"save model as: \", fname_out)if __name__ == \"__main__\": parser = argparse.ArgumentParser() parser.add_argument(\"model_name\", nargs=\'?\', default=\"Qwen/Qwen2-VL-2B-Instruct\") parser.add_argument(\"--model_type\", nargs=\'?\', choices=[\'qwen2vl\', \'qwen2.5vl\'], default=\"qwen2vl\") parser.add_argument(\"--data_type\", nargs=\'?\', choices=[\'fp32\', \'fp16\'], default=\"fp32\") args = parser.parse_args() main(args)
至此,本文分享的内容就结束了。