Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator ClientPartitioning

From Leeroopedia
Knowledge Sources
Domains File Partitioning, Data Ingestion, Data Curation
Last Updated 2026-02-14 00:00 GMT

Overview

Implements ClientPartitioningStage, a processing stage that partitions client-provided file paths into FileGroupTask objects for downstream parallel processing, using fsspec for filesystem abstraction.

Description

ClientPartitioningStage extends FilePartitioningStage and enables remote and cloud file discovery and partitioning for pipeline ingestion. The module contains:

  • ClientPartitioningStage -- A dataclass-based stage that discovers files relative to a root path using fsspec, filters them by extension and count limit, wraps paths as FSPath objects, and groups them into FileGroupTask instances. Key behaviors:
    • setup() initializes an fsspec filesystem from the configured file_paths using url_to_fs().
    • process() discovers relative file paths via _list_relative(), applies extension and limit filters, creates FSPath objects, partitions by count (if files_per_partition is set), and creates FileGroupTask instances with metadata including partition index, total partitions, and source files.
    • _list_relative() returns sorted, de-duplicated paths relative to root. It supports two modes: reading from a JSON manifest file (input_list_json_path) or discovering files via fsspec glob/find operations. When file extensions are known, it uses recursive glob for potential server-side optimization (e.g., in s3fs).
  • _read_list_json_rel() -- A module-level helper function that reads a JSON array of absolute file paths via fsspec, validates each entry is under the root directory, and converts them to root-relative paths with stable de-duplication and sorting.

Usage

Use this stage as the entry point in pipelines that need to process files from remote or cloud storage. It replaces the local FilePartitioningStage when files are accessible via fsspec-supported protocols (S3, GCS, Azure, etc.) or when the file list is provided as a JSON manifest.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/client_partitioning.py
  • Lines: 1-145

Signature

@dataclass
class ClientPartitioningStage(FilePartitioningStage):
    input_list_json_path: str | None = None
    name: str = "client_partitioning"

    def setup(self, worker_metadata: WorkerMetadata | None = None) -> None: ...
    def process(self, _: _EmptyTask) -> list[FileGroupTask]: ...
    def _list_relative(self) -> list[str]: ...


def _read_list_json_rel(
    root: str, json_url: str, storage_options: dict[str, Any]
) -> list[str]: ...

Import

from nemo_curator.stages.client_partitioning import ClientPartitioningStage

I/O Contract

Inputs

Name Type Required Description
file_paths str Yes Root path to the file directory (inherited from FilePartitioningStage). Supports fsspec protocols (s3://, gs://, etc.)
input_list_json_path str No Path to a JSON file containing an array of absolute file paths (default: None)
file_extensions list[str] No List of file extensions to filter by (inherited from FilePartitioningStage)
files_per_partition int No Number of files per partition group (inherited from FilePartitioningStage)
limit int No Maximum number of files to process (inherited from FilePartitioningStage)
storage_options dict No Storage options passed to fsspec (inherited from FilePartitioningStage)

Outputs

Name Type Description
result list[FileGroupTask] List of FileGroupTask objects, each containing a group of FSPath file references and metadata (partition_index, total_partitions, storage_options, source_files)

Usage Examples

Basic Usage with S3

from nemo_curator.stages.client_partitioning import ClientPartitioningStage

stage = ClientPartitioningStage(
    file_paths="s3://my-bucket/data/",
    file_extensions=[".jsonl", ".parquet"],
    files_per_partition=10,
    storage_options={"key": "...", "secret": "..."},
)

Using a JSON Manifest

from nemo_curator.stages.client_partitioning import ClientPartitioningStage

stage = ClientPartitioningStage(
    file_paths="s3://my-bucket/data/",
    input_list_json_path="s3://my-bucket/manifests/file_list.json",
    files_per_partition=5,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment