Implementation:NVIDIA NeMo Curator DuplicateRemovalStage
| Knowledge Sources | |
|---|---|
| Domains | Deduplication, Data Curation, ETL |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Implements the removal phase of the distributed deduplication pipeline, filtering out duplicate documents from a DocumentBatch based on pre-computed removal lists stored as Parquet files.
Description
TextDuplicatesRemovalStage is a ProcessingStage that removes duplicate documents identified by an upstream deduplication process. It operates on the assumption that a prior deduplication stage has already computed a list of document IDs to remove and written them to Parquet files.
The stage works through an optimized three-step process:
- Range determination - Computes the minimum and maximum ID values in the current input batch to define the relevant ID range.
- Selective Parquet reading - Reads only the subset of the removal Parquet file that overlaps with the batch's ID range, using predicate pushdown filters (
>= min_idand<= max_id). This avoids loading the entire removal list into memory. - Set-based filtering - Converts the removal IDs to a Python set and filters the input DataFrame using
~df[id_field].isin(removal_ids)to exclude duplicate documents.
The stage logs performance metrics for each step (min/max computation time, Parquet read time, and ID removal time) via _log_metrics.
Usage
Use TextDuplicatesRemovalStage as the final step in a deduplication pipeline, after a deduplication algorithm (e.g., MinHash LSH, exact dedup) has written out the IDs of documents to remove. This stage reads those IDs and filters the actual document batches, producing clean, deduplicated output.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/text/deduplication/removal.py
- Lines: 1-106
Signature
@dataclass
class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]):
ids_to_remove_path: str
id_field: str = CURATOR_DEDUP_ID_STR
duplicate_id_field: str = "id"
read_kwargs: dict[str, Any] | None = None
def process(self, task: DocumentBatch) -> DocumentBatch: ...
def inputs(self) -> tuple[list[str], list[str]]: ...
Import
from nemo_curator.stages.text.deduplication.removal import TextDuplicatesRemovalStage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ids_to_remove_path | str | Yes | Path to Parquet files containing the IDs of documents to remove |
| id_field | str | No | Field name for the document ID in the input DataFrame (default: CURATOR_DEDUP_ID_STR) |
| duplicate_id_field | str | No | Field name for the ID column in the removal Parquet files (default: "id") |
| read_kwargs | dict[str, Any] or None | No | Additional keyword arguments passed to pd.read_parquet |
Outputs
| Name | Type | Description |
|---|---|---|
| DocumentBatch | DocumentBatch | Filtered batch with duplicate documents removed |
| task_id | str | Prefixed with "removal_" followed by the original task_id |
| _metadata["num_removed"] | int | Count of documents removed from the batch |
Usage Examples
Basic Usage
from nemo_curator.stages.text.deduplication.removal import TextDuplicatesRemovalStage
# Create removal stage pointing to pre-computed duplicate IDs
removal_stage = TextDuplicatesRemovalStage(
ids_to_remove_path="/data/dedup/ids_to_remove/",
)
With Custom ID Fields
# Use custom ID field names
removal_stage = TextDuplicatesRemovalStage(
ids_to_remove_path="/data/dedup/ids_to_remove/",
id_field="doc_id",
duplicate_id_field="duplicate_doc_id",
)
With Additional Parquet Read Options
# Pass extra options to the Parquet reader
removal_stage = TextDuplicatesRemovalStage(
ids_to_remove_path="/data/dedup/ids_to_remove/",
read_kwargs={"engine": "pyarrow"},
)
Internal Processing
Performance Optimization
The key optimization in this stage is the range-based Parquet filtering. Rather than loading the full removal list (which can be millions of IDs for large datasets), the stage:
- Reads only the ID range
[min_id, max_id]from the current batch - Uses Parquet predicate pushdown to avoid reading irrelevant rows from disk
- Converts the small subset of removal IDs to a set for O(1) membership testing
This approach significantly reduces memory usage and I/O when processing large-scale deduplication workloads.
Metrics Logged
The stage logs three timing metrics via _log_metrics:
- input_df_min_max_time - Time to compute the min/max ID range of the input batch
- read_dupes_time - Time to read the relevant subset from the removal Parquet files
- id_removal_time - Time to perform the set-based filtering of the DataFrame
Related Pages
- NVIDIA_NeMo_Curator_FuzzyDeduplicationWorkflow - Upstream workflow that produces the removal ID lists
- NVIDIA_NeMo_Curator_Fuzzy_IdentifyDuplicatesStage - Stage that identifies duplicates before removal
- NVIDIA_NeMo_Curator_ConnectedComponentsStage - Stage that groups duplicates into connected components
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base