> 技术文档 > 大模型开发:源码分析 Qwen 2.5-VL 视频抽帧模块(附加FFmpeg 性能对比测试)_qwen2.5vl 视频输入

大模型开发:源码分析 Qwen 2.5-VL 视频抽帧模块(附加FFmpeg 性能对比测试)_qwen2.5vl 视频输入

目录

qwen 视频理解能力

messages 构建 demo

qwen 抽帧代码分析

验证两个实际 case

官网介绍图

性能对比:ffmpeg 抽帧、decord 库抽帧

介绍

联系

对比

测试结果

测试明细

ffmpeg

100 qps 测试(CPU)

decord

100 qps 测试(CPU)

100 qps 测试(GPU)


本文也录制了详细的视频讲解:

[IT超016] 大模型:源码分析Qwen2.5VL视频抽帧模块(附加FFmpeg性能对比测试)_哔哩哔哩_bilibili

qwen 视频理解能力

Qwen2.5-VL 是由阿里云 Qwen 团队开发的多模态大型语言模型系列,仓库地址:https://github.com/QwenLM/Qwen2.5-VL

messages 构建 demo

# 方式一:输入视频文件(这种才会走抽帧逻辑)messages = [ { \"role\": \"user\", \"content\": [ { \"type\": \"video\", \"video\": \"file:///path/to/video1.mp4\", \"max_pixels\": 360 * 420, \"fps\": 1.0, }, {\"type\": \"text\", \"text\": \"Describe this video.\"}, ], }]# 方式二:直接输入多图messages = [ { \"role\": \"user\", \"content\": [ { \"type\": \"video\", \"video\": [  \"file:///path/to/frame1.jpg\",  \"file:///path/to/frame2.jpg\",  \"file:///path/to/frame3.jpg\",  \"file:///path/to/frame4.jpg\", ], }, {\"type\": \"text\", \"text\": \"Describe this video.\"}, ], }]

qwen 抽帧代码分析

视频 message 处理的核心代码:https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py

视频解析抽帧能力依赖——decord 库:https://github.com/dmlc/decord?tab=readme-ov-file#install-from-source

Decord 是一个专门为视频数据处理和深度学习设计的轻量级、高性能的视频解码库,擅长处理帧的随机访问模式,避免了像 FFmpeg 那样从头开始逐帧解码——Qwen 抽帧模块用的这个库

 

1、vision_process.py # process_vision_info:处理 messages 中的 图像 / 视频 输入

 

2、vision_process.py # fetch_video:处理 messages 中的 视频 输入

 

🚩 核心抽帧逻辑(简单来说:0.5 秒抽一张)(30 秒视频,最终抽帧是 60 张)

根据 FPS 常数(默认 2)、视频总帧数(秒时长 * 原始 fps)计算抽帧数量 nframes,之后等距离/步长抽取 nframes 张帧图

  • 可解析总帧数 = 秒时长 * 原始 fps ———— 例如 30s 视频,24 fps,值为 720
  • 抽帧数 = FPS 常数(默认 2)* 秒时长 ————例如 30s 视频,值为 60张
  • 抽帧步长:默认 0.5 秒————(等距步长平均分割,和 FPS 常数有关)

 

 

验证两个实际 case

录屏设置:24 fps

方案

30秒 的视频

1分30秒 的视频

qwen 抽帧

加入日志

 

 

...

加入日志

 

...

 

摘出 qwen 2.5-vl 抽帧模块代码(增加自定义日志)

import torchimport timeimport mathfrom typing import Tuplefrom torchvision.utils import save_imageIMAGE_FACTOR = 28MIN_PIXELS = 4 * 28 * 28MAX_PIXELS = 16384 * 28 * 28MAX_RATIO = 200VIDEO_MIN_PIXELS = 128 * 28 * 28VIDEO_MAX_PIXELS = 768 * 28 * 28FRAME_FACTOR = 2FPS = 2.0FPS_MIN_FRAMES = 4FPS_MAX_FRAMES = 768def _read_video_decord( ele: dict,) -> Tuple[torch.Tensor, float]: \"\"\"read video using decord.VideoReader Args: ele (dict): a dict contains the configuration of video. support keys: - video: the path of video. support \"file://\", \"http://\", \"https://\" and local path. - video_start: the start time of video. - video_end: the end time of video. Returns: torch.Tensor: the video tensor with shape (T, C, H, W). \"\"\" import decord video_path = ele[\"video\"] st = time.time() vr = decord.VideoReader(video_path) # TODO: support start_pts and end_pts if \'video_start\' in ele or \'video_end\' in ele: raise NotImplementedError(\"not support start_pts and end_pts in decord for now.\") total_frames, video_fps = len(vr), vr.get_avg_fps() print(f\"==11: {video_path=}, {total_frames=}, {video_fps=}, time={time.time() - st:.3f}s\\n\") nframes = smart_nframes(ele, total_frames=total_frames, video_fps=video_fps) idx = torch.linspace(0, total_frames - 1, nframes).round().long().tolist() video = vr.get_batch(idx).asnumpy() print(f\"==22: {nframes=}, {idx=}, {len(idx)=}, video: \") print(\"Type:\", type(video)) print(\"Shape:\", video.shape) print(\"Data Type:\", video.dtype) print(\"Number of dimensions:\", video.ndim) print(\"Number of elements:\", video.size) print(\"Size in bytes:\", video.nbytes) print(\'\\n\') video = torch.tensor(video).permute(0, 3, 1, 2) # Convert to TCHW format sample_fps = nframes / max(total_frames, 1e-6) * video_fps return video, sample_fpsdef smart_nframes( ele: dict, total_frames: int, video_fps: float,) -> int: \"\"\"calculate the number of frames for video used for model inputs. Args: ele (dict): a dict contains the configuration of video. support either `fps` or `nframes`: - nframes: the number of frames to extract for model inputs. - fps: the fps to extract frames for model inputs.  - min_frames: the minimum number of frames of the video, only used when fps is provided.  - max_frames: the maximum number of frames of the video, only used when fps is provided. total_frames (int): the original total number of frames of the video. video_fps (int | float): the original fps of the video. Raises: ValueError: nframes should in interval [FRAME_FACTOR, total_frames]. Returns: int: the number of frames for video used for model inputs. \"\"\" assert not (\"fps\" in ele and \"nframes\" in ele), \"Only accept either `fps` or `nframes`\" if \"nframes\" in ele: nframes = round_by_factor(ele[\"nframes\"], FRAME_FACTOR) else: fps = ele.get(\"fps\", FPS) min_frames = ceil_by_factor(ele.get(\"min_frames\", FPS_MIN_FRAMES), FRAME_FACTOR) max_frames = floor_by_factor(ele.get(\"max_frames\", min(FPS_MAX_FRAMES, total_frames)), FRAME_FACTOR) nframes = total_frames / video_fps * fps if nframes > total_frames: print(f\"smart_nframes: nframes[{nframes}] > total_frames[{total_frames}]\") nframes = min(min(max(nframes, min_frames), max_frames), total_frames) nframes = floor_by_factor(nframes, FRAME_FACTOR) if not (FRAME_FACTOR <= nframes and nframes  int: \"\"\"Returns the closest integer to \'number\' that is divisible by \'factor\'.\"\"\" return round(number / factor) * factordef ceil_by_factor(number: int, factor: int) -> int: \"\"\"Returns the smallest integer greater than or equal to \'number\' that is divisible by \'factor\'.\"\"\" return math.ceil(number / factor) * factordef floor_by_factor(number: int, factor: int) -> int: \"\"\"Returns the largest integer less than or equal to \'number\' that is divisible by \'factor\'.\"\"\" return math.floor(number / factor) * factorvideo, sample_fps = _read_video_decord(ele={\"video\": \"150108580006469_30s.mp4\"})print(f\"read_video_decord result: {video=}, {sample_fps=}\\n\")# 将视频帧保存为本地图片for i, frame in enumerate(video): # 将像素值缩放到 [0, 1] 范围 frame = frame.float() / 255.0 save_image(frame, f\'./video-frame-30s/frame_{i}.png\') # save_image(frame, f\'./video-frame-1m30s/frame_{i}.png\') print(f\"Frame {i} saved as frame_{i}.png\")

官网介绍图

 

性能对比:ffmpeg 抽帧、decord 库抽帧

介绍

ffmpeg 是一个强大且广泛使用的开源多媒体框架,它由多个库和工具组成,例如libavformat(处理音视频封装格式)、libavcodec(进行音视频编解码)、libavutil(提供通用的工具函数)等。在抽帧时,FFmpeg 会利用这些库协同工作。

 

Decord 是一个专门为视频数据处理和深度学习设计的轻量级、高性能的视频解码库,擅长处理帧的随机访问模式,避免了像 FFmpeg 那样从头开始逐帧解码

 

联系

decord 依赖 ffmpeg 的核心库

 

对比

  • 速度:ffmpeg 更优,比 decord 快 16%
  • 资源消耗:ffmpeg 更优,比 decord 节省资源 25%(只测了下 cpu 抽帧)
  • 官方维护:
    • ffmpeg 一直在更新,社区也更活跃,48.9k star
    • decord 库已经 3 年未更新,2.1k star

 

测试结果

测试指标:

  • 100 batch 视频文件测试
  • 开发机:train-A100-6 配置为 94核 - 860GiB - 4卡A100

 

抽帧方案

测试样本(37秒视频)

video_150108527915923_37s.mp4

测试样本(1分21秒视频)video_150108525914511_1m21s.mp4

现状

ffmpeg - cpu

总耗时: 4.18 秒

平均耗时: 2829.07 毫秒

成功数: 100

失败数: 0

 

抽出 37 张 + 写入磁盘

 

CPU核数换算——需 7 核(94*6%*120%)

总耗时: 12.09 秒

平均耗时: 9383.33 毫秒

成功数: 100

失败数: 0

 

抽出 81 张 + 写入磁盘

 

CPU核数换算——需 23 核(94*22%*120%)

新方案

decord - cpu

总耗时: 5.33 秒

平均耗时: 3352.33 毫秒

成功数: 100

失败数: 0

 

抽出 38 张 + 写入磁盘

 

CPU核数换算——需 7 核(94*7%*120%)

总耗时: 15.89 秒

平均耗时: 12617.40 毫秒

成功数: 100

失败数: 0

 

抽出 85 张 + 写入磁盘

 

CPU核数换算——需 24 核(94*25%*120%)

 

decord - gpu

环境不兼容

环境不兼容

 

测试明细

ffmpeg

# 登陆开发机

ffmpeg -i /root/zhoulongchao/script/resource/video_150108527915923_37s.mp4 -map 0:v -q:v 3 -vsync 0 -f image2 -vf fps=1 -y ./%d.jpg

 

100 qps 测试(CPU)

cd /root/zhoulongchao/script/ffmpeg

vim demo-ffmpeg.py

python demo-ffmpeg.py

import subprocessimport timefrom concurrent.futures import ThreadPoolExecutordef run_ffmpeg_command(): # FFmpeg 命令 command = [ \'ffmpeg\', \'-i\', \'/root/zhoulongchao/script/resource/video_150108525914511_1m21s.mp4\', \'-map\', \'0:v\', \'-q:v\', \'3\', \'-vsync\', \'0\', \'-f\', \'image2\', \'-vf\', \'fps=1\', \'-y\', \'./%d.jpg\'] start_time = time.time() # 运行 FFmpeg 命令 result = subprocess.run(command, capture_output=True, text=True) end_time = time.time() # 计算耗时(毫秒) elapsed_time_ms = (end_time - start_time) * 1000 print(\"FFmpeg 输出:\") print(result.stdout) print(\"错误信息:\") print(result.stderr) print(f\"耗时: {elapsed_time_ms:.2f} 毫秒\") # 根据返回码判断是否成功 success = result.returncode == 0 return elapsed_time_ms, successif __name__ == \"__main__\": # 目标 QPS target_qps = 100 # 每个请求的间隔时间(秒) interval = 1 / target_qps total_elapsed_time = 0 all_elapsed_times = [] success_count = 0 failure_count = 0 with ThreadPoolExecutor(max_workers=target_qps) as executor: start_time = time.time() futures = [] for _ in range(target_qps): future = executor.submit(run_ffmpeg_command) futures.append(future) time.sleep(interval) for future in futures: elapsed_time, success = future.result() all_elapsed_times.append(elapsed_time) total_elapsed_time += elapsed_time if success: success_count += 1 else: failure_count += 1 end_time = time.time() average_elapsed_time = total_elapsed_time / target_qps if target_qps > 0 else 0 print(f\"总耗时: {end_time - start_time:.2f} 秒\") print(f\"平均耗时: {average_elapsed_time:.2f} 毫秒\") print(f\"成功数: {success_count}\") print(f\"失败数: {failure_count}\")

 

decord

cd /root/zhoulongchao/script/decord

vim main.py

python main.py

 

100 qps 测试(CPU)

import cv2import timefrom decord import VideoReaderfrom decord import cpufrom concurrent.futures import ProcessPoolExecutorimport multiprocessingdef process_video(video_path): start_time = time.time() try: vr = VideoReader(video_path, ctx=cpu(0)) fps = vr.get_avg_fps() interval = int(fps) for i in range(0, len(vr), interval): frame = vr[i].asnumpy() frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) cv2.imwrite(f\'frame_{i}.jpg\', frame_bgr) end_time = time.time() elapsed_time = (end_time - start_time) * 1000 return elapsed_time, True except Exception: end_time = time.time() elapsed_time = (end_time - start_time) * 1000 return elapsed_time, Falseif __name__ == \"__main__\": video_path = \'/root/zhoulongchao/script/resource/video_150108527915923_37s.mp4\' target_qps = 100 interval = 1 / target_qps total_elapsed_time = 0 success_count = 0 failure_count = 0 # 获取系统的 CPU 核心数量 cpu_count = multiprocessing.cpu_count() with ProcessPoolExecutor(max_workers=cpu_count) as executor: start_time = time.time() futures = [] for _ in range(target_qps): future = executor.submit(process_video, video_path) futures.append(future) time.sleep(interval) for future in futures: elapsed_time, success = future.result() total_elapsed_time += elapsed_time if success: success_count += 1 else: failure_count += 1 end_time = time.time() average_elapsed_time = total_elapsed_time / target_qps if target_qps > 0 else 0 print(f\"总耗时: {end_time - start_time:.2f} 秒\") print(f\"平均耗时: {average_elapsed_time:.2f} 毫秒\") print(f\"成功数: {success_count}\") print(f\"失败数: {failure_count}\")

 

100 qps 测试(GPU)

 》这种方式环境配置失败了,暂时只写了一半

1、安装用于构建共享库的系统包 Ubuntu 运行:

# official PPA comes with ffmpeg 2.8, which lacks tons of features, we use ffmpeg 4.0 hereadd-apt-repository ppa:jonathonf/ffmpeg-4 # for ubuntu20.04 official PPA is already version 4.2, you may skip this stepapt-get updateapt-get install -y build-essential python3-dev python3-setuptools make cmakeapt-get install -y ffmpeg libavcodec-dev libavfilter-dev libavformat-dev libavutil-dev# note: make sure you have cmake 3.8 or later, you can install from cmake official website if it\'s too old

 

2、递归克隆 repo(重要)

git clone --recursivehttps://github.com/dmlc/decord

 

3、在源根目录中构建共享库(指定-DUSE_CUDA=ON或-DUSE_CUDA=/path/to/cuda或-DUSE_CUDA=ON-DCMAKE_CUDA_COMPILER=/path/to/cuda/nvcc启用 NVDEC 硬件加速解码)(要指定自定义的 FFMPEG 库路径,请使用“-DFFMPEG_DIR=/path/to/ffmpeg”):

cd /root/zhoulongchao/script/decord

cd decordmkdir build && cd buildcmake .. -DUSE_CUDA=ON -DCMAKE_BUILD_TYPE=Releasemake

请注意,如果您遇到了问题libnvcuvid.so,可能是由于缺少链接 libnvcuvid.so,可以手动找到它(ldconfig -p | grep libnvcuvid)并将库链接到,CUDA_TOOLKIT_ROOT_DIR\\lib64以便decord顺利检测并链接正确的库。或者——

Video Codec SDK - Get Started | NVIDIA Developer

mv libnvcuvid.so /usr/local/cuda/lib64/

 

4、安装python绑定:

cd ../python# option 1: add python path to $PYTHONPATH, you will need to install numpy separatelypwd=$PWDecho \"PYTHONPATH=$PYTHONPATH:$pwd\" >> ~/.bashrcsource ~/.bashrc# option 2: install with setuptoolspython3 setup.py install --user

 

make

 

import cv2import timefrom decord import VideoReaderfrom decord import gpufrom concurrent.futures import ProcessPoolExecutorimport multiprocessingdef process_video(video_path): start_time = time.time() try: # 修改为使用 GPU 进行解码 vr = VideoReader(video_path, ctx=gpu(0)) fps = vr.get_avg_fps() interval = int(fps) for i in range(0, len(vr), interval): frame = vr[i].asnumpy() frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) cv2.imwrite(f\'frame_{i}.jpg\', frame_bgr) end_time = time.time() elapsed_time = (end_time - start_time) * 1000 return elapsed_time, True except Exception: end_time = time.time() elapsed_time = (end_time - start_time) * 1000 return elapsed_time, Falseif __name__ == \"__main__\": video_path = \'/root/zhoulongchao/script/resource/video_150108527915923_37s.mp4\' target_qps = 100 interval = 1 / target_qps total_elapsed_time = 0 success_count = 0 failure_count = 0 # 获取系统的 CPU 核心数量 cpu_count = multiprocessing.cpu_count() with ProcessPoolExecutor(max_workers=cpu_count) as executor: start_time = time.time() futures = [] for _ in range(target_qps): future = executor.submit(process_video, video_path) futures.append(future) time.sleep(interval) for future in futures: elapsed_time, success = future.result() total_elapsed_time += elapsed_time if success: success_count += 1 else: failure_count += 1 end_time = time.time() average_elapsed_time = total_elapsed_time / target_qps if target_qps > 0 else 0 print(f\"总耗时: {end_time - start_time:.2f} 秒\") print(f\"平均耗时: {average_elapsed_time:.2f} 毫秒\") print(f\"成功数: {success_count}\") print(f\"失败数: {failure_count}\")