Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator BaseWriter

From Leeroopedia
Knowledge Sources
Domains Data Output, IO, Data Pipeline, Filesystem
Last Updated 2026-02-14 00:00 GMT

Overview

Defines the abstract base class for all writer stages that convert DocumentBatch inputs into written files and return FileGroupTask outputs in the NeMo Curator pipeline.

Description

BaseWriter extends ProcessingStage[DocumentBatch, FileGroupTask] as an abstract base class (ABC) dataclass. It provides common functionality for writing data to files, including filesystem resolution, output mode validation, deterministic file naming, and metadata propagation.

On initialization, BaseWriter uses fsspec.url_to_fs to resolve the target filesystem (local or remote) and validates the output mode against existing files using check_output_mode. The supported output modes are:

  • ignore - skip writing if the output directory already exists
  • overwrite - overwrite existing files
  • append - append to existing output (only if the subclass sets append_mode_implemented=True)
  • error - raise an error if the output directory already exists

The process method generates deterministic filenames from source file metadata via SHA-256 hashing (using writer_utils.get_deterministic_hash). If no source file metadata is available, it falls back to a UUID-based filename. The method calls the abstract write_data method and returns a FileGroupTask with the written file path and format metadata.

Usage

BaseWriter is not used directly. Instead, use format-specific subclasses such as ParquetWriter or JsonlWriter. Implement write_data when creating a new writer for a custom file format.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/io/writer/base.py
  • Lines: 1-105

Signature

@dataclass
class BaseWriter(ProcessingStage[DocumentBatch, FileGroupTask], ABC):
    path: str
    file_extension: str
    write_kwargs: dict[str, Any] = field(default_factory=dict)
    fields: list[str] | None = None
    name: str = "BaseWriter"
    mode: Literal["ignore", "overwrite", "append", "error"] = "ignore"
    append_mode_implemented: bool = False

Import

from nemo_curator.stages.text.io.writer.base import BaseWriter

I/O Contract

Inputs

Name Type Required Description
path str Yes Output directory path (local or remote URL)
file_extension str Yes File extension for output files (e.g. "parquet", "jsonl")
write_kwargs dict[str, Any] No Additional keyword arguments for the format-specific writer (default: empty dict)
fields list[str] or None No If specified, only write these columns (default: None, write all)
mode Literal["ignore", "overwrite", "append", "error"] No Output mode controlling behavior when files exist (default: "ignore")
append_mode_implemented bool No Whether the subclass supports append mode (default: False)

Stage I/O Specification

Method Returns
inputs() (["data"], [])
outputs() (["data"], [])

Outputs

Name Type Description
FileGroupTask FileGroupTask Contains the list of written file paths and metadata including the output format

Usage Examples

Implementing a Custom Writer

from dataclasses import dataclass
from typing import Any

from nemo_curator.stages.text.io.writer.base import BaseWriter
from nemo_curator.tasks import DocumentBatch


@dataclass
class CSVWriter(BaseWriter):
    name: str = "csv_writer"
    file_extension: str = "csv"

    def write_data(self, task: DocumentBatch, file_path: str) -> None:
        df = task.to_pandas()
        if self.fields is not None:
            df = df[self.fields]
        df.to_csv(file_path, index=False, **self.write_kwargs)

Using a Writer

from nemo_curator.stages.text.io.writer.base import BaseWriter

# Subclass instances are typically created like:
# writer = ParquetWriter(path="/output/data/", mode="overwrite")

Implementation Details

Filesystem Resolution

BaseWriter uses fsspec.url_to_fs to automatically resolve the target filesystem from the output path. This supports:

  • Local file paths (e.g. /data/output/)
  • S3 paths (e.g. s3://bucket/prefix/)
  • GCS paths (e.g. gs://bucket/prefix/)
  • Any other fsspec-compatible URL scheme

Storage options are extracted from write_kwargs under the storage_options key.

Deterministic File Naming

When the input DocumentBatch contains source_files in its metadata, BaseWriter generates a deterministic filename using writer_utils.get_deterministic_hash, which computes a SHA-256 hash of the source file paths and task ID. This ensures that re-processing the same data produces the same output filenames. When no source file metadata is available, a random UUID is used as a fallback.

Remote URL Handling

For remote URLs, BaseWriter restores the protocol prefix (e.g. s3://) to the generated file path using fs.unstrip_protocol so that downstream stages can correctly infer the filesystem type from the path.

Process Flow

  1. Extract source files from task metadata for deterministic naming
  2. Generate filename with the appropriate file extension
  3. Check if the file already exists (log a debug message if overwriting)
  4. Call the abstract write_data method
  5. Log the number of records written
  6. Return a FileGroupTask with the file path and format metadata

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment