Implementation:NVIDIA NeMo Curator Base DocumentIterator
| Knowledge Sources | |
|---|---|
| Domains | Data Iteration, Abstract Base Class, Pipeline Infrastructure |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
DocumentIterator is the abstract base class for file-to-record iterators in NeMo Curator, and DocumentIterateExtractStage is the companion processing stage that combines iteration with optional extraction to convert downloaded files into structured DocumentBatch results.
Description
The DocumentIterator ABC defines the interface for parsing downloaded files into record dictionaries. It has two abstract methods:
iterate(file_path): A generator that yieldsdict[str, Any]records from a given file. The record fields can contain raw content in any format (HTML, LaTeX, JSON, etc.).output_columns(): Declares the field names produced by the iterator.
The DocumentIterateExtractStage is a dataclass extending ProcessingStage[FileGroupTask, DocumentBatch]. It provides the runtime orchestration:
- Receives a
FileGroupTaskcontaining local file paths. - For each file, calls
iterator.iterate(file_path)to produce records. - Optionally applies a
DocumentExtractorto each record for content transformation. - Adds a filename column to each record (configurable via
add_filename_column). - Respects an optional
record_limitper file. - Collects all records into a pandas
DataFrameand wraps it in aDocumentBatch.
Error handling is per-file: if iteration fails for one file, the error is logged and processing continues with the next file.
Usage
Subclass DocumentIterator to implement file parsing for specific data formats. Use DocumentIterateExtractStage to wire an iterator (and optional extractor) into a pipeline.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/stages/text/download/base/iterator.py - Lines: 1-138
Signature
class DocumentIterator(ABC):
"""Abstract base class for document iterators."""
@abstractmethod
def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
"""Iterate over records in a file, yielding dict records."""
...
@abstractmethod
def output_columns(self) -> list[str]:
"""Define output columns - produces DocumentBatch with records."""
...
@dataclass
class DocumentIterateExtractStage(ProcessingStage[FileGroupTask, DocumentBatch]):
"""Stage that iterates through downloaded files and extracts structured content."""
iterator: DocumentIterator
extractor: DocumentExtractor | None = None
record_limit: int | None = None
add_filename_column: bool | str = True
def inputs(self) -> tuple[list[str], list[str]]:
...
def outputs(self) -> tuple[list[str], list[str]]:
...
def process(self, task: FileGroupTask) -> DocumentBatch:
...
Import
from nemo_curator.stages.text.download.base.iterator import DocumentIterator, DocumentIterateExtractStage
# Or via the package shortcut:
from nemo_curator.stages.text.download import DocumentIterator
I/O Contract
DocumentIterator Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_path | str |
Yes | Path to a downloaded file to iterate over (passed to iterate())
|
DocumentIterator Outputs
| Name | Type | Description |
|---|---|---|
| yields | dict[str, Any] |
Record dictionaries with fields defined by output_columns()
|
DocumentIterateExtractStage Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| iterator | DocumentIterator |
Yes | The iterator implementation to parse files |
| extractor | None | No | Optional extractor to transform records (default: None) |
| record_limit | None | No | Maximum number of records to extract per file (default: None, unlimited) |
| add_filename_column | str | No | Whether to add a filename column, or a custom column name (default: True) |
DocumentIterateExtractStage I/O
| Direction | Type | Description |
|---|---|---|
| Input | FileGroupTask |
Task containing local file paths in task.data
|
| Output | DocumentBatch |
Batch containing a pandas DataFrame of extracted records in batch.data
|
Key Behaviors
Stage Naming
The stage name is automatically generated based on the iterator and extractor class names:
- With extractor:
iterate_extract_{iterator_class}_{extractor_class} - Without extractor:
iterate_{iterator_class}
Filename Column
When add_filename_column is True, a column named filename (resolved via resolve_filename_column) is added to each record containing os.path.basename(file_path). A custom column name can be specified as a string instead.
Output Column Resolution
The output columns are determined by:
- The extractor's
output_columns()if an extractor is provided. - The iterator's
output_columns()if no extractor is provided. - Plus the filename column if
add_filename_columnis enabled.
Usage Examples
Implementing a Custom Iterator
import json
from collections.abc import Iterator
from typing import Any
from nemo_curator.stages.text.download import DocumentIterator
class JsonlIterator(DocumentIterator):
"""Iterates over JSONL files, yielding one record per line."""
def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
with open(file_path) as f:
for line in f:
yield json.loads(line)
def output_columns(self) -> list[str]:
return ["text", "url", "timestamp"]
Using DocumentIterateExtractStage
from nemo_curator.stages.text.download.base.iterator import DocumentIterateExtractStage
stage = DocumentIterateExtractStage(
iterator=JsonlIterator(),
extractor=None, # Records are already in final format
record_limit=10000,
add_filename_column=True,
)
With an Extractor
from nemo_curator.stages.text.download.base.iterator import DocumentIterateExtractStage
from nemo_curator.stages.text.download.arxiv.iterator import ArxivIterator
from nemo_curator.stages.text.download.arxiv.extract import ArxivExtractor
stage = DocumentIterateExtractStage(
iterator=ArxivIterator(),
extractor=ArxivExtractor(),
add_filename_column="source_file",
)
Known Implementations
- ArxivIterator -- Iterates over ArXiv tar archives yielding per-paper LaTeX content