Implementation:NVIDIA NeMo Curator ConnectedComponentsStage
| Attribute | Value |
|---|---|
| Domains | Data_Curation, Deduplication, Graph_Processing |
| Implements | NVIDIA_NeMo_Curator_Connected_Component_Analysis |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
ConnectedComponentsStage is the NeMo Curator processing stage that finds clusters of duplicate documents by computing weakly connected components in a document similarity graph using GPU-accelerated graph algorithms.
Description
ConnectedComponentsStage implements the ProcessingStage[FileGroupTask, FileGroupTask] interface and mixes in DeduplicationIO. Unlike most other stages in the pipeline, this stage uses process_batch() to read ALL edge files at once, since connected component analysis is a global graph operation that requires visibility into the complete edge set.
The stage loads all edge Parquet files, constructs a graph using pylibcugraph/RAFT, computes weakly connected components, and outputs Parquet files mapping each _curator_dedup_id to its _duplicate_group_id.
Usage
from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage
cc_stage = ConnectedComponentsStage(
output_path="/output/connected_components/",
)
# Note: uses process_batch() to consume ALL edge tasks at once
output_tasks = cc_stage.process_batch(edge_tasks)
Code Reference
Source Location
nemo_curator/stages/deduplication/fuzzy/connected_components.py, lines 37–202.
Signature
class ConnectedComponentsStage(ProcessingStage[FileGroupTask, FileGroupTask], DeduplicationIO):
def __init__(
self,
output_path: str,
source_field: str | None = None,
destination_field: str | None = None,
...
)
Import
from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | list[FileGroupTask] |
All edge Parquet tasks (consumed via process_batch()); each task contains edge files with _curator_dedup_id_x and _curator_dedup_id_y columns
|
| Output | list[FileGroupTask] |
Parquet files with _curator_dedup_id and _duplicate_group_id columns
|
| Output Column | _curator_dedup_id |
The unique document identifier |
| Output Column | _duplicate_group_id |
Integer label identifying the connected component (duplicate group) the document belongs to |
| Parameters | output_path |
Directory path where connected component Parquet files are written |
| Parameters | source_field |
Name of the source document ID column in edge files (default: auto-detected) |
| Parameters | destination_field |
Name of the destination document ID column in edge files (default: auto-detected) |
NOTE: This stage uses process_batch() instead of process() to read ALL edges at once for global graph analysis. This is because connected component computation requires the entire graph to correctly identify transitive relationships.
Usage Examples
Example 1: Standard connected component analysis
from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage
stage = ConnectedComponentsStage(
output_path="/output/connected_components/",
)
# Must pass ALL edge tasks from the pipeline
cc_tasks = stage.process_batch(all_edge_tasks)
Example 2: With explicit field names
from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage
stage = ConnectedComponentsStage(
output_path="/output/connected_components/",
source_field="_curator_dedup_id_x",
destination_field="_curator_dedup_id_y",
)
Example 3: Integration in the full pipeline
from nemo_curator.stages.deduplication.fuzzy.buckets_to_edges import BucketsToEdgesStage
from nemo_curator.stages.deduplication.fuzzy.connected_components import ConnectedComponentsStage
# Previous stage produces edges
edges_stage = BucketsToEdgesStage(output_path="/output/edges/")
edge_tasks = [edges_stage.process(bucket_task) for bucket_task in bucket_tasks]
# Connected components requires ALL edges
cc_stage = ConnectedComponentsStage(output_path="/output/cc/")
cc_tasks = cc_stage.process_batch(edge_tasks)
Related Pages
- Principle:NVIDIA_NeMo_Curator_Connected_Component_Analysis
- NVIDIA_NeMo_Curator_BucketsToEdgesStage — Upstream stage that produces the edge graph
- NVIDIA_NeMo_Curator_Fuzzy_IdentifyDuplicatesStage — Downstream stage that selects duplicates for removal
- NVIDIA_NeMo_Curator_FuzzyDeduplicationWorkflow — The parent workflow orchestrating all stages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- Environment:NVIDIA_NeMo_Curator_RAPIDS_GPU_Stack
- Environment:NVIDIA_NeMo_Curator_Ray_Cluster