Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator BaseReader

From Leeroopedia
Knowledge Sources
Domains Data Ingestion, IO, Data Pipeline
Last Updated 2026-02-14 00:00 GMT

Overview

Defines the abstract base class for all tabular file reader stages that convert FileGroupTask inputs into DocumentBatch outputs in the NeMo Curator pipeline.

Description

BaseReader extends ProcessingStage[FileGroupTask, DocumentBatch] as a dataclass and centralizes shared logic for reading files, selecting fields, validating results, and optionally managing deduplication IDs. Subclasses only need to implement the read_data abstract method to handle format-specific file reading.

The class supports two mutually exclusive ID management modes coordinated via a Ray actor (IdGenerator):

  • _generate_ids: Generates new monotonically increasing IDs by registering each batch's row count with the IdGenerator actor and computing a sequential range starting from the returned minimum ID.
  • _assign_ids: Assigns pre-computed ID ranges by querying the IdGenerator actor for the batch range associated with specific file paths.

Both modes add a _curator_dedup_id column to the output DataFrame using numpy.arange for efficient sequential ID generation.

The process method orchestrates the read-validate-ID flow: it merges read kwargs, delegates to read_data, validates the result is non-empty, applies IDs (for Pandas DataFrames only), and wraps the result in a DocumentBatch.

Usage

BaseReader is not used directly. Instead, use format-specific subclasses such as JsonlReaderStage or ParquetReaderStage. Implement read_data when creating a new reader for a custom file format.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/io/reader/base.py
  • Lines: 1-137

Signature

@dataclass
class BaseReader(ProcessingStage[FileGroupTask, DocumentBatch]):
    fields: list[str] | None = None
    read_kwargs: dict[str, Any] = field(default_factory=dict)
    name: str = ""
    _generate_ids: bool = False
    _assign_ids: bool = False

Import

from nemo_curator.stages.text.io.reader.base import BaseReader

I/O Contract

Inputs

Name Type Required Description
fields list[str] or None No If specified, only these columns are read from the source files (default: None, read all)
read_kwargs dict[str, Any] No Keyword arguments passed to the underlying file reader (default: empty dict)
_generate_ids bool No Whether to generate new monotonically increasing deduplication IDs (default: False)
_assign_ids bool No Whether to assign pre-computed deduplication IDs from the IdGenerator actor (default: False)

Stage I/O Specification

Method Returns
inputs() ([], [])
outputs() (["data"], output_fields) where output_fields includes selected fields plus optionally _curator_dedup_id

Outputs

Name Type Description
DocumentBatch DocumentBatch Contains the read data as a Pandas DataFrame with task metadata propagated from the input FileGroupTask

Usage Examples

Implementing a Custom Reader

from dataclasses import dataclass
from typing import Any

import pandas as pd

from nemo_curator.stages.text.io.reader.base import BaseReader


@dataclass
class CSVReaderStage(BaseReader):
    name: str = "csv_reader"

    def read_data(
        self,
        file_paths: list[str],
        read_kwargs: dict[str, Any] | None = None,
        fields: list[str] | None = None,
    ) -> pd.DataFrame | None:
        read_kwargs = read_kwargs or {}
        dfs = [pd.read_csv(path, **read_kwargs) for path in file_paths]
        if fields is not None:
            dfs = [df[fields] for df in dfs]
        return pd.concat(dfs, ignore_index=True) if dfs else None

Using a Reader with ID Generation

from nemo_curator.stages.text.io.reader.jsonl import JsonlReaderStage

reader = JsonlReaderStage(
    fields=["text", "url"],
    _generate_ids=True,
)

Implementation Details

Validation

The process method validates that read_data returns a non-empty result. It checks for None, empty attribute (for Pandas DataFrames), and num_rows == 0 (for Arrow tables), raising a ValueError if no data was read.

ID Generation vs. ID Assignment

  • _generate_ids: Calls id_generator.register_batch.remote(filepath, num_rows) to register the batch size and receive a starting ID. IDs are sequential from min_id to min_id + num_rows - 1.
  • _assign_ids: Calls id_generator.get_batch_range.remote(filepath, None) to retrieve a pre-computed (min_id, max_id) range. IDs are sequential from min_id to max_id.

Both modes skip ID assignment if the _curator_dedup_id column already exists and log a warning.

Ray Actor Stage

When either _generate_ids or _assign_ids is True, the ray_stage_spec method returns {"IS_ACTOR_STAGE": True}, signaling to the Ray backend that this stage should be scheduled as an actor for stateful ID coordination.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment