Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator DuplicateRemovalStage

From Leeroopedia
Knowledge Sources
Domains Deduplication, Data Curation, ETL
Last Updated 2026-02-14 00:00 GMT

Overview

Implements the removal phase of the distributed deduplication pipeline, filtering out duplicate documents from a DocumentBatch based on pre-computed removal lists stored as Parquet files.

Description

TextDuplicatesRemovalStage is a ProcessingStage that removes duplicate documents identified by an upstream deduplication process. It operates on the assumption that a prior deduplication stage has already computed a list of document IDs to remove and written them to Parquet files.

The stage works through an optimized three-step process:

  1. Range determination - Computes the minimum and maximum ID values in the current input batch to define the relevant ID range.
  2. Selective Parquet reading - Reads only the subset of the removal Parquet file that overlaps with the batch's ID range, using predicate pushdown filters (>= min_id and <= max_id). This avoids loading the entire removal list into memory.
  3. Set-based filtering - Converts the removal IDs to a Python set and filters the input DataFrame using ~df[id_field].isin(removal_ids) to exclude duplicate documents.

The stage logs performance metrics for each step (min/max computation time, Parquet read time, and ID removal time) via _log_metrics.

Usage

Use TextDuplicatesRemovalStage as the final step in a deduplication pipeline, after a deduplication algorithm (e.g., MinHash LSH, exact dedup) has written out the IDs of documents to remove. This stage reads those IDs and filters the actual document batches, producing clean, deduplicated output.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/deduplication/removal.py
  • Lines: 1-106

Signature

@dataclass
class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    ids_to_remove_path: str
    id_field: str = CURATOR_DEDUP_ID_STR
    duplicate_id_field: str = "id"
    read_kwargs: dict[str, Any] | None = None

    def process(self, task: DocumentBatch) -> DocumentBatch: ...
    def inputs(self) -> tuple[list[str], list[str]]: ...

Import

from nemo_curator.stages.text.deduplication.removal import TextDuplicatesRemovalStage

I/O Contract

Inputs

Name Type Required Description
ids_to_remove_path str Yes Path to Parquet files containing the IDs of documents to remove
id_field str No Field name for the document ID in the input DataFrame (default: CURATOR_DEDUP_ID_STR)
duplicate_id_field str No Field name for the ID column in the removal Parquet files (default: "id")
read_kwargs dict[str, Any] or None No Additional keyword arguments passed to pd.read_parquet

Outputs

Name Type Description
DocumentBatch DocumentBatch Filtered batch with duplicate documents removed
task_id str Prefixed with "removal_" followed by the original task_id
_metadata["num_removed"] int Count of documents removed from the batch

Usage Examples

Basic Usage

from nemo_curator.stages.text.deduplication.removal import TextDuplicatesRemovalStage

# Create removal stage pointing to pre-computed duplicate IDs
removal_stage = TextDuplicatesRemovalStage(
    ids_to_remove_path="/data/dedup/ids_to_remove/",
)

With Custom ID Fields

# Use custom ID field names
removal_stage = TextDuplicatesRemovalStage(
    ids_to_remove_path="/data/dedup/ids_to_remove/",
    id_field="doc_id",
    duplicate_id_field="duplicate_doc_id",
)

With Additional Parquet Read Options

# Pass extra options to the Parquet reader
removal_stage = TextDuplicatesRemovalStage(
    ids_to_remove_path="/data/dedup/ids_to_remove/",
    read_kwargs={"engine": "pyarrow"},
)

Internal Processing

Performance Optimization

The key optimization in this stage is the range-based Parquet filtering. Rather than loading the full removal list (which can be millions of IDs for large datasets), the stage:

  1. Reads only the ID range [min_id, max_id] from the current batch
  2. Uses Parquet predicate pushdown to avoid reading irrelevant rows from disk
  3. Converts the small subset of removal IDs to a set for O(1) membership testing

This approach significantly reduces memory usage and I/O when processing large-scale deduplication workloads.

Metrics Logged

The stage logs three timing metrics via _log_metrics:

  • input_df_min_max_time - Time to compute the min/max ID range of the input batch
  • read_dupes_time - Time to read the relevant subset from the removal Parquet files
  • id_removal_time - Time to perform the set-based filtering of the DataFrame

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment