Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:NVIDIA NeMo Curator Fuzzy IdentifyDuplicatesStage

From Leeroopedia
Implementation Metadata
Attribute Value
Domains Data_Curation, Deduplication
Implements NVIDIA_NeMo_Curator_Fuzzy_Duplicate_Identification
Last Updated 2026-02-14 17:00 GMT

Overview

IdentifyDuplicatesStage is the NeMo Curator processing stage that selects which documents to remove from each duplicate cluster, retaining one representative per group using a first-encountered strategy.

Description

IdentifyDuplicatesStage extends ShuffleStage, which provides distributed shuffle (repartition) capabilities. The stage reads connected component Parquet files containing _curator_dedup_id and _duplicate_group_id columns, shuffles data so that all members of the same group are co-located, applies cuDF.duplicated(keep="first") within each group, and outputs the document IDs of all documents marked for removal.

The stage also accepts a total_nparts parameter to control the number of output partitions, which affects the parallelism of downstream filtering operations.

Usage

from nemo_curator.stages.deduplication.fuzzy.identify_duplicates import IdentifyDuplicatesStage

identify_stage = IdentifyDuplicatesStage(
    output_path="/output/duplicates_to_remove/",
    duplicate_group_field="_duplicate_group_id",
    document_id_field="_curator_dedup_id",
)

# Execute within a pipeline
removal_tasks = identify_stage.process(cc_task)

Code Reference

Source Location

nemo_curator/stages/deduplication/fuzzy/identify_duplicates.py, lines 30–147.

Signature

class IdentifyDuplicatesStage(ShuffleStage):
    def __init__(
        self,
        duplicate_group_field: str = "_duplicate_group_id",
        document_id_field: str = "_curator_dedup_id",
        output_path: str = "./",
        total_nparts: int | None = None,
        ...
    )

Import

from nemo_curator.stages.deduplication.fuzzy.identify_duplicates import IdentifyDuplicatesStage

I/O Contract

I/O Contract
Direction Type Description
Input FileGroupTask Connected component Parquet files with _curator_dedup_id and _duplicate_group_id columns
Output list[FileGroupTask] Parquet files containing only the _curator_dedup_id values of documents to remove (all duplicates except the first-encountered representative per group)
Output Column _curator_dedup_id Document IDs of documents flagged for removal
Parameters duplicate_group_field Name of the column containing group labels (default: "_duplicate_group_id")
Parameters document_id_field Name of the column containing document IDs (default: "_curator_dedup_id")
Parameters output_path Directory path where removal-list Parquet files are written
Parameters total_nparts Optional number of output partitions (controls downstream parallelism)

Usage Examples

Example 1: Standard duplicate identification

from nemo_curator.stages.deduplication.fuzzy.identify_duplicates import IdentifyDuplicatesStage

stage = IdentifyDuplicatesStage(
    output_path="/output/duplicates_to_remove/",
    duplicate_group_field="_duplicate_group_id",
    document_id_field="_curator_dedup_id",
)

Example 2: With controlled output partitioning

from nemo_curator.stages.deduplication.fuzzy.identify_duplicates import IdentifyDuplicatesStage

stage = IdentifyDuplicatesStage(
    output_path="/output/duplicates_to_remove/",
    total_nparts=128,  # Control number of output files
)

Example 3: Full pipeline integration from connected components to removal list

from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage
from nemo_curator.stages.deduplication.fuzzy.identify_duplicates import IdentifyDuplicatesStage

# Connected component analysis (previous stage)
cc_stage = ConnectedComponentsStage(output_path="/output/cc/")
cc_tasks = cc_stage.process_batch(edge_tasks)

# Identify duplicates to remove
identify_stage = IdentifyDuplicatesStage(
    output_path="/output/duplicates_to_remove/",
)

# Process each connected component result
removal_tasks = []
for cc_task in cc_tasks:
    removal_tasks.extend(identify_stage.process(cc_task))

# removal_tasks now contain IDs of documents to filter out

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment