Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator SemanticDeduplicationStage

From Leeroopedia
Revision as of 13:22, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/NVIDIA_NeMo_Curator_SemanticDeduplicationStage.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Deduplication, NLP, Data Curation
Last Updated 2026-02-14 00:00 GMT

Overview

The TextSemanticDeduplicationWorkflow class provides a monolithic end-to-end workflow for text semantic deduplication, combining embedding generation, clustering-based semantic deduplication, and optional duplicate removal.

Description

TextSemanticDeduplicationWorkflow is a dataclass that orchestrates the complete lifecycle of semantic deduplication for text data. It encapsulates three major phases:

Phase 1 -- Embedding Generation: Builds a pipeline consisting of a reader stage (supporting JSONL or Parquet input), an EmbeddingCreatorStage for generating text embeddings using a configurable HuggingFace model (default: sentence-transformers/all-MiniLM-L6-v2), and a ParquetWriter that writes embeddings to a cache directory. The embedding stage supports configurable tokenization parameters including max sequence length, padding side, and pooling strategy (mean pooling or last token).

Phase 2 -- Semantic Deduplication: Delegates to SemanticDeduplicationWorkflow which performs K-means clustering on embeddings followed by pairwise similarity computation within clusters. Configurable parameters include the number of clusters, distance metric (cosine or L2), ranking strategy for which documents to keep ("hard", "easy", or "random"), and an epsilon threshold for duplicate identification. K-means clustering supports extensive tuning via max iterations, tolerance, initialization method, and oversampling factor.

Phase 3 -- Duplicate Removal (optional): When perform_removal=True, delegates to TextDuplicatesRemovalWorkflow which reads the original dataset, cross-references the identified duplicate IDs, and outputs a deduplicated dataset.

The workflow supports flexible executor configuration:

  • A single streaming_executor for all phases (defaults to XennaExecutor)
  • A tuple of three streaming executors for independent control of embedding, pairwise, and removal phases
  • A separate batch_executor for K-means clustering (defaults to RayActorPoolExecutor)

An optional ID generator can be used to assign unique document IDs across the pipeline, with state persistence to disk between phases. The workflow returns a WorkflowRunResult containing consolidated timing metrics, intermediate paths, and duplicate count metadata.

Configuration validation ensures that perform_removal=True requires a non-None eps, and that the ID generator is used consistently with the expected ID field.

Usage

Use TextSemanticDeduplicationWorkflow as the top-level entry point for deduplicating text datasets based on semantic similarity. It is suitable for large-scale data curation where near-duplicate documents need to be identified and optionally removed before training. The workflow handles the full pipeline from raw text to deduplicated output with extensive configurability.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/deduplication/semantic.py
  • Lines: 1-562

Signature

@dataclass
class TextSemanticDeduplicationWorkflow:
    # Input/Output configuration
    input_path: str | list[str]
    output_path: str
    cache_path: str | None = None
    perform_removal: bool = True

    # Embedding generation parameters
    text_field: str = "text"
    embedding_field: str = "embeddings"
    model_identifier: str = "sentence-transformers/all-MiniLM-L6-v2"
    embedding_max_seq_length: int = 512
    embedding_max_chars: int | None = None
    embedding_padding_side: Literal["left", "right"] = "right"
    embedding_pooling: Literal["mean_pooling", "last_token"] = "mean_pooling"
    embedding_model_inference_batch_size: int = 256
    hf_token: str | None = None

    # Semantic deduplication parameters
    n_clusters: int = 100
    id_field: str = CURATOR_DEDUP_ID_STR
    embedding_dim: int | None = None
    metadata_fields: list[str] | None = None
    distance_metric: Literal["cosine", "l2"] = "cosine"
    which_to_keep: Literal["hard", "easy", "random"] = "hard"
    eps: float | None = 0.01

    # K-means clustering parameters
    kmeans_max_iter: int = 300
    kmeans_tol: float = 1e-4
    kmeans_random_state: int = 42
    kmeans_init: str = "k-means||"
    kmeans_n_init: int | Literal["auto"] = 1
    kmeans_oversampling_factor: float = 2.0
    kmeans_max_samples_per_batch: int = 32768

    # Pairwise similarity parameters
    ranking_strategy: RankingStrategy | None = None
    pairwise_batch_size: int = 1024

    # I/O parameters
    input_filetype: Literal["jsonl", "parquet"] = "parquet"
    output_filetype: Literal["jsonl", "parquet"] = "parquet"

    def run(
        self,
        streaming_executor: BaseExecutor | tuple[BaseExecutor, BaseExecutor, BaseExecutor] | None = None,
        batch_executor: BaseExecutor | None = None,
    ) -> WorkflowRunResult: ...

Import

from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow

I/O Contract

Inputs

Name Type Required Description
input_path list[str] Yes Path(s) to input files containing text data
output_path str Yes Directory to write deduplicated output or duplicate IDs
cache_path None No Directory for intermediate results (embeddings, kmeans, pairwise); defaults to output_path
perform_removal bool No (default: True) Whether to perform duplicate removal or just identification
text_field str No (default: "text") Name of the text field in input data
model_identifier str No (default: "sentence-transformers/all-MiniLM-L6-v2") HuggingFace model for embedding generation
n_clusters int No (default: 100) Number of K-means clusters
distance_metric Literal["cosine", "l2"] No (default: "cosine") Distance metric for similarity computation
which_to_keep Literal["hard", "easy", "random"] No (default: "hard") Strategy for ranking documents within clusters
eps None No (default: 0.01) Epsilon threshold for duplicate identification; None skips identification
input_filetype Literal["jsonl", "parquet"] No (default: "parquet") Format of input files

Outputs

Name Type Description
WorkflowRunResult WorkflowRunResult Result object containing pipeline tasks, timing metadata, duplicate counts, and output paths
deduplicated output Parquet/JSONL files Deduplicated dataset written to output_path/deduplicated/ (when perform_removal=True)
duplicates output Parquet files Identified duplicate IDs written to output_path/duplicates/ (when eps is set)
embeddings cache Parquet files Computed embeddings written to cache_path/embeddings/

Workflow Phases

Phase Component Executor Description
1 Embedding Generation streaming_executor (or embedding_executor) Reads input, generates embeddings, writes to cache
2a K-means Clustering batch_executor (RayActorPoolExecutor) Clusters embeddings into n_clusters groups
2b Pairwise Similarity streaming_executor (or pairwise_executor) Computes pairwise similarity within clusters, identifies duplicates
3 Duplicate Removal streaming_executor (or removal_executor) Removes identified duplicates from original dataset (optional)

Directory Structure

output_path/
  duplicates/          # Identified duplicate IDs (when eps is set)
  deduplicated/        # Deduplicated output (when perform_removal=True)

cache_path/
  embeddings/          # Cached embedding Parquet files
  semantic_dedup/      # K-means and pairwise intermediate results

Usage Examples

Basic Usage

from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow

# Create and run the workflow with default executors
workflow = TextSemanticDeduplicationWorkflow(
    input_path="/data/input",
    output_path="/data/output",
    cache_path="/data/cache",
    text_field="text",
    n_clusters=100,
    eps=0.01,
    perform_removal=True,
)

result = workflow.run()
print(f"Total time: {result.metadata['total_time']:.2f}s")
print(f"Duplicates found: {result.metadata['num_duplicates']}")

Custom Executors

from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor

# Use different executors for different phases
embedding_exec = XennaExecutor(num_workers=8)
pairwise_exec = XennaExecutor(num_workers=4)
removal_exec = XennaExecutor(num_workers=8)

workflow = TextSemanticDeduplicationWorkflow(
    input_path="/data/input",
    output_path="/data/output",
    model_identifier="sentence-transformers/all-MiniLM-L6-v2",
    n_clusters=500,
    distance_metric="cosine",
    which_to_keep="hard",
    eps=0.05,
    kmeans_max_iter=500,
    pairwise_batch_size=2048,
)

result = workflow.run(
    streaming_executor=(embedding_exec, pairwise_exec, removal_exec),
    batch_executor=RayActorPoolExecutor(),
)

Identification Only (No Removal)

from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow

# Only identify duplicates without removing them
workflow = TextSemanticDeduplicationWorkflow(
    input_path="/data/input",
    output_path="/data/output",
    perform_removal=False,
    eps=0.01,
)

result = workflow.run()
# Duplicate IDs are written to /data/output/duplicates/

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment