Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:NVIDIA NeMo Curator ConnectedComponentsStage

From Leeroopedia
Implementation Metadata
Attribute Value
Domains Data_Curation, Deduplication, Graph_Processing
Implements NVIDIA_NeMo_Curator_Connected_Component_Analysis
Last Updated 2026-02-14 17:00 GMT

Overview

ConnectedComponentsStage is the NeMo Curator processing stage that finds clusters of duplicate documents by computing weakly connected components in a document similarity graph using GPU-accelerated graph algorithms.

Description

ConnectedComponentsStage implements the ProcessingStage[FileGroupTask, FileGroupTask] interface and mixes in DeduplicationIO. Unlike most other stages in the pipeline, this stage uses process_batch() to read ALL edge files at once, since connected component analysis is a global graph operation that requires visibility into the complete edge set.

The stage loads all edge Parquet files, constructs a graph using pylibcugraph/RAFT, computes weakly connected components, and outputs Parquet files mapping each _curator_dedup_id to its _duplicate_group_id.

Usage

from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage

cc_stage = ConnectedComponentsStage(
    output_path="/output/connected_components/",
)

# Note: uses process_batch() to consume ALL edge tasks at once
output_tasks = cc_stage.process_batch(edge_tasks)

Code Reference

Source Location

nemo_curator/stages/deduplication/fuzzy/connected_components.py, lines 37–202.

Signature

class ConnectedComponentsStage(ProcessingStage[FileGroupTask, FileGroupTask], DeduplicationIO):
    def __init__(
        self,
        output_path: str,
        source_field: str | None = None,
        destination_field: str | None = None,
        ...
    )

Import

from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage

I/O Contract

I/O Contract
Direction Type Description
Input list[FileGroupTask] All edge Parquet tasks (consumed via process_batch()); each task contains edge files with _curator_dedup_id_x and _curator_dedup_id_y columns
Output list[FileGroupTask] Parquet files with _curator_dedup_id and _duplicate_group_id columns
Output Column _curator_dedup_id The unique document identifier
Output Column _duplicate_group_id Integer label identifying the connected component (duplicate group) the document belongs to
Parameters output_path Directory path where connected component Parquet files are written
Parameters source_field Name of the source document ID column in edge files (default: auto-detected)
Parameters destination_field Name of the destination document ID column in edge files (default: auto-detected)

NOTE: This stage uses process_batch() instead of process() to read ALL edges at once for global graph analysis. This is because connected component computation requires the entire graph to correctly identify transitive relationships.

Usage Examples

Example 1: Standard connected component analysis

from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage

stage = ConnectedComponentsStage(
    output_path="/output/connected_components/",
)

# Must pass ALL edge tasks from the pipeline
cc_tasks = stage.process_batch(all_edge_tasks)

Example 2: With explicit field names

from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage

stage = ConnectedComponentsStage(
    output_path="/output/connected_components/",
    source_field="_curator_dedup_id_x",
    destination_field="_curator_dedup_id_y",
)

Example 3: Integration in the full pipeline

from nemo_curator.stages.deduplication.fuzzy.buckets_to_edges import BucketsToEdgesStage
from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage

# Previous stage produces edges
edges_stage = BucketsToEdgesStage(output_path="/output/edges/")
edge_tasks = [edges_stage.process(bucket_task) for bucket_task in bucket_tasks]

# Connected components requires ALL edges
cc_stage = ConnectedComponentsStage(output_path="/output/cc/")
cc_tasks = cc_stage.process_batch(edge_tasks)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment