Implementation:NVIDIA NeMo Curator BaseReader
| Knowledge Sources | |
|---|---|
| Domains | Data Ingestion, IO, Data Pipeline |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Defines the abstract base class for all tabular file reader stages that convert FileGroupTask inputs into DocumentBatch outputs in the NeMo Curator pipeline.
Description
BaseReader extends ProcessingStage[FileGroupTask, DocumentBatch] as a dataclass and centralizes shared logic for reading files, selecting fields, validating results, and optionally managing deduplication IDs. Subclasses only need to implement the read_data abstract method to handle format-specific file reading.
The class supports two mutually exclusive ID management modes coordinated via a Ray actor (IdGenerator):
- _generate_ids: Generates new monotonically increasing IDs by registering each batch's row count with the IdGenerator actor and computing a sequential range starting from the returned minimum ID.
- _assign_ids: Assigns pre-computed ID ranges by querying the IdGenerator actor for the batch range associated with specific file paths.
Both modes add a _curator_dedup_id column to the output DataFrame using numpy.arange for efficient sequential ID generation.
The process method orchestrates the read-validate-ID flow: it merges read kwargs, delegates to read_data, validates the result is non-empty, applies IDs (for Pandas DataFrames only), and wraps the result in a DocumentBatch.
Usage
BaseReader is not used directly. Instead, use format-specific subclasses such as JsonlReaderStage or ParquetReaderStage. Implement read_data when creating a new reader for a custom file format.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/text/io/reader/base.py
- Lines: 1-137
Signature
@dataclass
class BaseReader(ProcessingStage[FileGroupTask, DocumentBatch]):
fields: list[str] | None = None
read_kwargs: dict[str, Any] = field(default_factory=dict)
name: str = ""
_generate_ids: bool = False
_assign_ids: bool = False
Import
from nemo_curator.stages.text.io.reader.base import BaseReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| fields | list[str] or None | No | If specified, only these columns are read from the source files (default: None, read all) |
| read_kwargs | dict[str, Any] | No | Keyword arguments passed to the underlying file reader (default: empty dict) |
| _generate_ids | bool | No | Whether to generate new monotonically increasing deduplication IDs (default: False) |
| _assign_ids | bool | No | Whether to assign pre-computed deduplication IDs from the IdGenerator actor (default: False) |
Stage I/O Specification
| Method | Returns |
|---|---|
| inputs() | ([], []) |
| outputs() | (["data"], output_fields) where output_fields includes selected fields plus optionally _curator_dedup_id |
Outputs
| Name | Type | Description |
|---|---|---|
| DocumentBatch | DocumentBatch | Contains the read data as a Pandas DataFrame with task metadata propagated from the input FileGroupTask |
Usage Examples
Implementing a Custom Reader
from dataclasses import dataclass
from typing import Any
import pandas as pd
from nemo_curator.stages.text.io.reader.base import BaseReader
@dataclass
class CSVReaderStage(BaseReader):
name: str = "csv_reader"
def read_data(
self,
file_paths: list[str],
read_kwargs: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> pd.DataFrame | None:
read_kwargs = read_kwargs or {}
dfs = [pd.read_csv(path, **read_kwargs) for path in file_paths]
if fields is not None:
dfs = [df[fields] for df in dfs]
return pd.concat(dfs, ignore_index=True) if dfs else None
Using a Reader with ID Generation
from nemo_curator.stages.text.io.reader.jsonl import JsonlReaderStage
reader = JsonlReaderStage(
fields=["text", "url"],
_generate_ids=True,
)
Implementation Details
Validation
The process method validates that read_data returns a non-empty result. It checks for None, empty attribute (for Pandas DataFrames), and num_rows == 0 (for Arrow tables), raising a ValueError if no data was read.
ID Generation vs. ID Assignment
- _generate_ids: Calls id_generator.register_batch.remote(filepath, num_rows) to register the batch size and receive a starting ID. IDs are sequential from min_id to min_id + num_rows - 1.
- _assign_ids: Calls id_generator.get_batch_range.remote(filepath, None) to retrieve a pre-computed (min_id, max_id) range. IDs are sequential from min_id to max_id.
Both modes skip ID assignment if the _curator_dedup_id column already exists and log a warning.
Ray Actor Stage
When either _generate_ids or _assign_ids is True, the ray_stage_spec method returns {"IS_ACTOR_STAGE": True}, signaling to the Ray backend that this stage should be scheduled as an actor for stateful ID coordination.
Related Pages
- NVIDIA_NeMo_Curator_JSONLReader - JSONL format reader subclass
- NVIDIA_NeMo_Curator_ParquetReader - Parquet format reader subclass
- NVIDIA_NeMo_Curator_BaseWriter - Corresponding base class for writer stages
- NVIDIA_NeMo_Curator_FilePartitioningStage - Stage that produces FileGroupTask inputs for readers
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base