Implementation:NVIDIA NeMo Curator BaseWriter
| Knowledge Sources | |
|---|---|
| Domains | Data Output, IO, Data Pipeline, Filesystem |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Defines the abstract base class for all writer stages that convert DocumentBatch inputs into written files and return FileGroupTask outputs in the NeMo Curator pipeline.
Description
BaseWriter extends ProcessingStage[DocumentBatch, FileGroupTask] as an abstract base class (ABC) dataclass. It provides common functionality for writing data to files, including filesystem resolution, output mode validation, deterministic file naming, and metadata propagation.
On initialization, BaseWriter uses fsspec.url_to_fs to resolve the target filesystem (local or remote) and validates the output mode against existing files using check_output_mode. The supported output modes are:
- ignore - skip writing if the output directory already exists
- overwrite - overwrite existing files
- append - append to existing output (only if the subclass sets append_mode_implemented=True)
- error - raise an error if the output directory already exists
The process method generates deterministic filenames from source file metadata via SHA-256 hashing (using writer_utils.get_deterministic_hash). If no source file metadata is available, it falls back to a UUID-based filename. The method calls the abstract write_data method and returns a FileGroupTask with the written file path and format metadata.
Usage
BaseWriter is not used directly. Instead, use format-specific subclasses such as ParquetWriter or JsonlWriter. Implement write_data when creating a new writer for a custom file format.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/text/io/writer/base.py
- Lines: 1-105
Signature
@dataclass
class BaseWriter(ProcessingStage[DocumentBatch, FileGroupTask], ABC):
path: str
file_extension: str
write_kwargs: dict[str, Any] = field(default_factory=dict)
fields: list[str] | None = None
name: str = "BaseWriter"
mode: Literal["ignore", "overwrite", "append", "error"] = "ignore"
append_mode_implemented: bool = False
Import
from nemo_curator.stages.text.io.writer.base import BaseWriter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| path | str | Yes | Output directory path (local or remote URL) |
| file_extension | str | Yes | File extension for output files (e.g. "parquet", "jsonl") |
| write_kwargs | dict[str, Any] | No | Additional keyword arguments for the format-specific writer (default: empty dict) |
| fields | list[str] or None | No | If specified, only write these columns (default: None, write all) |
| mode | Literal["ignore", "overwrite", "append", "error"] | No | Output mode controlling behavior when files exist (default: "ignore") |
| append_mode_implemented | bool | No | Whether the subclass supports append mode (default: False) |
Stage I/O Specification
| Method | Returns |
|---|---|
| inputs() | (["data"], []) |
| outputs() | (["data"], []) |
Outputs
| Name | Type | Description |
|---|---|---|
| FileGroupTask | FileGroupTask | Contains the list of written file paths and metadata including the output format |
Usage Examples
Implementing a Custom Writer
from dataclasses import dataclass
from typing import Any
from nemo_curator.stages.text.io.writer.base import BaseWriter
from nemo_curator.tasks import DocumentBatch
@dataclass
class CSVWriter(BaseWriter):
name: str = "csv_writer"
file_extension: str = "csv"
def write_data(self, task: DocumentBatch, file_path: str) -> None:
df = task.to_pandas()
if self.fields is not None:
df = df[self.fields]
df.to_csv(file_path, index=False, **self.write_kwargs)
Using a Writer
from nemo_curator.stages.text.io.writer.base import BaseWriter
# Subclass instances are typically created like:
# writer = ParquetWriter(path="/output/data/", mode="overwrite")
Implementation Details
Filesystem Resolution
BaseWriter uses fsspec.url_to_fs to automatically resolve the target filesystem from the output path. This supports:
- Local file paths (e.g. /data/output/)
- S3 paths (e.g. s3://bucket/prefix/)
- GCS paths (e.g. gs://bucket/prefix/)
- Any other fsspec-compatible URL scheme
Storage options are extracted from write_kwargs under the storage_options key.
Deterministic File Naming
When the input DocumentBatch contains source_files in its metadata, BaseWriter generates a deterministic filename using writer_utils.get_deterministic_hash, which computes a SHA-256 hash of the source file paths and task ID. This ensures that re-processing the same data produces the same output filenames. When no source file metadata is available, a random UUID is used as a fallback.
Remote URL Handling
For remote URLs, BaseWriter restores the protocol prefix (e.g. s3://) to the generated file path using fs.unstrip_protocol so that downstream stages can correctly infer the filesystem type from the path.
Process Flow
- Extract source files from task metadata for deterministic naming
- Generate filename with the appropriate file extension
- Check if the file already exists (log a debug message if overwriting)
- Call the abstract write_data method
- Log the number of records written
- Return a FileGroupTask with the file path and format metadata
Related Pages
- NVIDIA_NeMo_Curator_ParquetWriter - Parquet format writer subclass
- NVIDIA_NeMo_Curator_BaseReader - Corresponding base class for reader stages
- NVIDIA_NeMo_Curator_JSONLReader - JSONL reader that produces DocumentBatch data for writing
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base