Implementation:NVIDIA NeMo Curator ClientPartitioning
| Knowledge Sources | |
|---|---|
| Domains | File Partitioning, Data Ingestion, Data Curation |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Implements ClientPartitioningStage, a processing stage that partitions client-provided file paths into FileGroupTask objects for downstream parallel processing, using fsspec for filesystem abstraction.
Description
ClientPartitioningStage extends FilePartitioningStage and enables remote and cloud file discovery and partitioning for pipeline ingestion. The module contains:
- ClientPartitioningStage -- A dataclass-based stage that discovers files relative to a root path using fsspec, filters them by extension and count limit, wraps paths as
FSPathobjects, and groups them intoFileGroupTaskinstances. Key behaviors:- setup() initializes an fsspec filesystem from the configured
file_pathsusingurl_to_fs(). - process() discovers relative file paths via
_list_relative(), applies extension and limit filters, createsFSPathobjects, partitions by count (iffiles_per_partitionis set), and createsFileGroupTaskinstances with metadata including partition index, total partitions, and source files. - _list_relative() returns sorted, de-duplicated paths relative to root. It supports two modes: reading from a JSON manifest file (
input_list_json_path) or discovering files via fsspec glob/find operations. When file extensions are known, it uses recursive glob for potential server-side optimization (e.g., in s3fs).
- setup() initializes an fsspec filesystem from the configured
- _read_list_json_rel() -- A module-level helper function that reads a JSON array of absolute file paths via fsspec, validates each entry is under the root directory, and converts them to root-relative paths with stable de-duplication and sorting.
Usage
Use this stage as the entry point in pipelines that need to process files from remote or cloud storage. It replaces the local FilePartitioningStage when files are accessible via fsspec-supported protocols (S3, GCS, Azure, etc.) or when the file list is provided as a JSON manifest.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/client_partitioning.py
- Lines: 1-145
Signature
@dataclass
class ClientPartitioningStage(FilePartitioningStage):
input_list_json_path: str | None = None
name: str = "client_partitioning"
def setup(self, worker_metadata: WorkerMetadata | None = None) -> None: ...
def process(self, _: _EmptyTask) -> list[FileGroupTask]: ...
def _list_relative(self) -> list[str]: ...
def _read_list_json_rel(
root: str, json_url: str, storage_options: dict[str, Any]
) -> list[str]: ...
Import
from nemo_curator.stages.client_partitioning import ClientPartitioningStage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_paths | str | Yes | Root path to the file directory (inherited from FilePartitioningStage). Supports fsspec protocols (s3://, gs://, etc.) |
| input_list_json_path | str | No | Path to a JSON file containing an array of absolute file paths (default: None) |
| file_extensions | list[str] | No | List of file extensions to filter by (inherited from FilePartitioningStage) |
| files_per_partition | int | No | Number of files per partition group (inherited from FilePartitioningStage) |
| limit | int | No | Maximum number of files to process (inherited from FilePartitioningStage) |
| storage_options | dict | No | Storage options passed to fsspec (inherited from FilePartitioningStage) |
Outputs
| Name | Type | Description |
|---|---|---|
| result | list[FileGroupTask] | List of FileGroupTask objects, each containing a group of FSPath file references and metadata (partition_index, total_partitions, storage_options, source_files) |
Usage Examples
Basic Usage with S3
from nemo_curator.stages.client_partitioning import ClientPartitioningStage
stage = ClientPartitioningStage(
file_paths="s3://my-bucket/data/",
file_extensions=[".jsonl", ".parquet"],
files_per_partition=10,
storage_options={"key": "...", "secret": "..."},
)
Using a JSON Manifest
from nemo_curator.stages.client_partitioning import ClientPartitioningStage
stage = ClientPartitioningStage(
file_paths="s3://my-bucket/data/",
input_list_json_path="s3://my-bucket/manifests/file_list.json",
files_per_partition=5,
)