Implementation:NVIDIA NeMo Curator DecoderUtils
| Knowledge Sources | |
|---|---|
| Domains | Video Processing, Decoding, Frame Extraction, Data Curation |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Provides CPU-based video decoding, timestamp-aware frame extraction, and metadata extraction utilities using PyAV and ffprobe, serving as the central video I/O utility for the entire NeMo Curator video pipeline.
Description
The decoder_utils module is one of the most critical utility files in the NeMo Curator video processing stack. It provides a layered API for reading video data, extracting frames at configurable rates, and obtaining video metadata.
Data Types:
- Resolution: A
NamedTuplestoring video frame height and width. - VideoMetadata: A dataclass holding video properties: height, width, fps, num_frames, video_codec, pixel_format, video_duration, audio_codec, and bit_rate_k.
- FrameExtractionPolicy: An enum defining frame selection strategies:
first,middle,last, andsequence. - FramePurpose: An enum defining extraction purposes:
AESTHETICSandEMBEDDINGS. - FrameExtractionSignature: A dataclass combining extraction policy and target FPS into a reproducible configuration with a
to_str()method for string serialization.
Core Functions:
- extract_video_metadata: Shells out to
ffprobeviasubprocess.runto extract video metadata. Accepts either a file path string or raw bytes (which are written to a temporary file). Parses the JSON output to extract resolution, frame rate, codec, duration, pixel format, audio codec, and bit rate. Falls back to format-level duration if stream-level is unavailable.
- _make_video_stream: Converts various input types (
Path,str,bytes,io.BytesIO,io.BufferedReader) into a consistentBinaryIOinterface for video processing.
- save_stream_position: A context manager that saves the current stream position and restores it upon exit, enabling multiple reads of the same stream.
- get_video_timestamps: Uses PyAV to extract presentation timestamps from a video stream, returning them as a monotonically increasing sorted numpy array. Handles the fact that decode order differs from presentation order when B-frames are present.
- find_closest_indices: Finds the closest matching indices in a sorted source array for each element in a destination array, using binary search via
np.searchsorted.
- sample_closest: Samples a monotonically increasing array at a given rate, returning closest indices, counts (for deduplication), and sample elements. Supports configurable start/stop points and endpoint inclusion. Designed for timestamp-based sensor synchronization.
- decode_video_cpu_frame_ids: Decodes specific frame IDs from a video stream using PyAV with multi-threaded support. Preallocates the output numpy array and handles frame count repetition for supersampled frames.
- get_avg_frame_rate: Retrieves the average frame rate from a video, falling back to timestamp-based calculation if
average_rateis not available.
- decode_video_cpu: The high-level decoding API that combines timestamp extraction, sampling at a configurable FPS, and frame decoding. Supports start/stop time bounds and endpoint control.
- get_frame_count: Returns the total number of frames in a video stream, falling back to timestamp counting if the stream metadata is unavailable.
- extract_frames: Policy-based frame extraction that supports
sequence(all frames at sample rate) andmiddle(single middle frame) policies, with optional resizing via OpenCV (cv2.INTER_CUBIC).
Usage
This module is used by nearly every video processing stage in NeMo Curator. The video reader stage uses it to extract metadata, the clip extraction stage uses it to decode frames at specific timestamps, the frame extraction stage uses policy-based extraction for aesthetic scoring and embedding generation, and the motion filter backend depends on it for metadata extraction.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/utils/decoder_utils.py - Lines: 1-667
Signature
class Resolution(NamedTuple):
height: int
width: int
@dataclass
class VideoMetadata:
height: int = None
width: int = None
fps: float = None
num_frames: int = None
video_codec: str = None
pixel_format: str = None
video_duration: float = None
audio_codec: str = None
bit_rate_k: int = None
class FrameExtractionPolicy(enum.Enum):
first = 0
middle = 1
last = 2
sequence = 3
class FramePurpose(enum.Enum):
AESTHETICS = 1
EMBEDDINGS = 2
@dataclass
class FrameExtractionSignature:
extraction_policy: FrameExtractionPolicy
target_fps: float
def to_str(self) -> str: ...
def extract_video_metadata(video: str | bytes) -> VideoMetadata: ...
def save_stream_position(stream: BinaryIO) -> Generator[BinaryIO, None, None]: ...
def get_video_timestamps(data, stream_idx=0, video_format=None) -> npt.NDArray[np.float32]: ...
def find_closest_indices(src, dst) -> npt.NDArray[np.int32]: ...
def sample_closest(src, sample_rate, start=None, stop=None, endpoint=True, dedup=True) -> tuple: ...
def decode_video_cpu_frame_ids(data, frame_ids, counts=None, stream_idx=0, video_format=None, num_threads=1) -> npt.NDArray[np.uint8]: ...
def get_avg_frame_rate(data, stream_idx=0, video_format=None) -> float: ...
def decode_video_cpu(data, sample_rate_fps, timestamps=None, start=None, stop=None, endpoint=True, stream_idx=0, video_format=None, num_threads=1) -> npt.NDArray[np.uint8]: ...
def get_frame_count(data, stream_idx=0, video_format=None) -> int: ...
def extract_frames(video, extraction_policy, sample_rate_fps=1.0, target_res=(-1,-1), num_threads=1, stream_idx=0, video_format=None) -> npt.NDArray[np.uint8]: ...
Import
from nemo_curator.utils.decoder_utils import (
extract_video_metadata,
decode_video_cpu,
extract_frames,
get_video_timestamps,
get_avg_frame_rate,
get_frame_count,
FrameExtractionPolicy,
FramePurpose,
FrameExtractionSignature,
VideoMetadata,
Resolution,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| video / data | str | BinaryIO | bytes | Yes | Video file path, raw bytes, or binary stream. Accepted by most functions. |
| sample_rate_fps | float |
Yes (for decode_video_cpu) | Target frame rate for sampling the video. |
| extraction_policy | FrameExtractionPolicy |
Yes (for extract_frames) | Policy for selecting which frames to extract (sequence, middle, etc.). |
| frame_ids | npt.NDArray[np.int32] |
Yes (for decode_video_cpu_frame_ids) | Array of specific frame indices to decode. |
| stream_idx | int |
No (default: 0) | Index of the video stream within the container. |
| video_format | None | No (default: None) | Format hint for the video stream (e.g., "mp4", "mkv"). |
| num_threads | int |
No (default: 1) | Number of threads for PyAV decoding. |
| start | None | No | Start timestamp for frame extraction range. |
| stop | None | No | End timestamp for frame extraction range. |
| endpoint | bool |
No (default: True) | Whether to include the stop timestamp in sampling. |
| target_res | tuple[int, int] |
No (default: (-1, -1)) | Target resolution for frame resizing; (-1, -1) means no resizing. |
Outputs
| Name | Type | Description |
|---|---|---|
| VideoMetadata | VideoMetadata |
Dataclass containing extracted video properties (height, width, fps, codec, duration, etc.). |
| frames | npt.NDArray[np.uint8] |
Numpy array of shape (num_frames, height, width, 3) in RGB24 format, returned by decode and extract functions.
|
| timestamps | npt.NDArray[np.float32] |
Monotonically increasing array of presentation timestamps, returned by get_video_timestamps.
|
| indices | npt.NDArray[np.int32] |
Closest matching indices, returned by find_closest_indices and sample_closest.
|
| frame_count | int |
Total number of frames in the video, returned by get_frame_count.
|
| avg_frame_rate | float |
Average frame rate of the video, returned by get_avg_frame_rate.
|
Usage Examples
Extract Video Metadata
from nemo_curator.utils.decoder_utils import extract_video_metadata
# From a file path
metadata = extract_video_metadata("/data/videos/sample.mp4")
print(f"Resolution: {metadata.width}x{metadata.height}")
print(f"FPS: {metadata.fps}")
print(f"Duration: {metadata.video_duration}s")
print(f"Codec: {metadata.video_codec}")
# From raw bytes
with open("/data/videos/sample.mp4", "rb") as f:
video_bytes = f.read()
metadata = extract_video_metadata(video_bytes)
Decode Frames at a Specific Rate
from nemo_curator.utils.decoder_utils import decode_video_cpu
# Decode frames at 2 FPS from a video file
frames = decode_video_cpu(
data="/data/videos/sample.mp4",
sample_rate_fps=2.0,
num_threads=4,
)
print(f"Decoded {frames.shape[0]} frames of shape {frames.shape[1:]}")
Policy-Based Frame Extraction
from nemo_curator.utils.decoder_utils import extract_frames, FrameExtractionPolicy
# Extract the middle frame at 384x384 resolution for aesthetic scoring
frames = extract_frames(
video="/data/videos/sample.mp4",
extraction_policy=FrameExtractionPolicy.middle,
sample_rate_fps=1.0,
target_res=(384, 384),
)
print(f"Extracted {frames.shape[0]} frame(s)")
# Extract a sequence of frames at 1 FPS
frames_seq = extract_frames(
video="/data/videos/sample.mp4",
extraction_policy=FrameExtractionPolicy.sequence,
sample_rate_fps=1.0,
)
Timestamp-Based Sampling
from nemo_curator.utils.decoder_utils import get_video_timestamps, sample_closest
# Get all timestamps
timestamps = get_video_timestamps("/data/videos/sample.mp4")
print(f"Video has {len(timestamps)} frames")
# Sample at 2 FPS between specific time bounds
indices, counts, sample_elements = sample_closest(
src=timestamps,
sample_rate=2.0,
start=5.0,
stop=15.0,
)
print(f"Sampled {len(indices)} unique frame positions")