Implementation:NVIDIA NeMo Curator SemanticDeduplicationStage
| Knowledge Sources | |
|---|---|
| Domains | Deduplication, NLP, Data Curation |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
The TextSemanticDeduplicationWorkflow class provides a monolithic end-to-end workflow for text semantic deduplication, combining embedding generation, clustering-based semantic deduplication, and optional duplicate removal.
Description
TextSemanticDeduplicationWorkflow is a dataclass that orchestrates the complete lifecycle of semantic deduplication for text data. It encapsulates three major phases:
Phase 1 -- Embedding Generation: Builds a pipeline consisting of a reader stage (supporting JSONL or Parquet input), an EmbeddingCreatorStage for generating text embeddings using a configurable HuggingFace model (default: sentence-transformers/all-MiniLM-L6-v2), and a ParquetWriter that writes embeddings to a cache directory. The embedding stage supports configurable tokenization parameters including max sequence length, padding side, and pooling strategy (mean pooling or last token).
Phase 2 -- Semantic Deduplication: Delegates to SemanticDeduplicationWorkflow which performs K-means clustering on embeddings followed by pairwise similarity computation within clusters. Configurable parameters include the number of clusters, distance metric (cosine or L2), ranking strategy for which documents to keep ("hard", "easy", or "random"), and an epsilon threshold for duplicate identification. K-means clustering supports extensive tuning via max iterations, tolerance, initialization method, and oversampling factor.
Phase 3 -- Duplicate Removal (optional): When perform_removal=True, delegates to TextDuplicatesRemovalWorkflow which reads the original dataset, cross-references the identified duplicate IDs, and outputs a deduplicated dataset.
The workflow supports flexible executor configuration:
- A single
streaming_executorfor all phases (defaults toXennaExecutor) - A tuple of three streaming executors for independent control of embedding, pairwise, and removal phases
- A separate
batch_executorfor K-means clustering (defaults toRayActorPoolExecutor)
An optional ID generator can be used to assign unique document IDs across the pipeline, with state persistence to disk between phases. The workflow returns a WorkflowRunResult containing consolidated timing metrics, intermediate paths, and duplicate count metadata.
Configuration validation ensures that perform_removal=True requires a non-None eps, and that the ID generator is used consistently with the expected ID field.
Usage
Use TextSemanticDeduplicationWorkflow as the top-level entry point for deduplicating text datasets based on semantic similarity. It is suitable for large-scale data curation where near-duplicate documents need to be identified and optionally removed before training. The workflow handles the full pipeline from raw text to deduplicated output with extensive configurability.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/stages/text/deduplication/semantic.py - Lines: 1-562
Signature
@dataclass
class TextSemanticDeduplicationWorkflow:
# Input/Output configuration
input_path: str | list[str]
output_path: str
cache_path: str | None = None
perform_removal: bool = True
# Embedding generation parameters
text_field: str = "text"
embedding_field: str = "embeddings"
model_identifier: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_max_seq_length: int = 512
embedding_max_chars: int | None = None
embedding_padding_side: Literal["left", "right"] = "right"
embedding_pooling: Literal["mean_pooling", "last_token"] = "mean_pooling"
embedding_model_inference_batch_size: int = 256
hf_token: str | None = None
# Semantic deduplication parameters
n_clusters: int = 100
id_field: str = CURATOR_DEDUP_ID_STR
embedding_dim: int | None = None
metadata_fields: list[str] | None = None
distance_metric: Literal["cosine", "l2"] = "cosine"
which_to_keep: Literal["hard", "easy", "random"] = "hard"
eps: float | None = 0.01
# K-means clustering parameters
kmeans_max_iter: int = 300
kmeans_tol: float = 1e-4
kmeans_random_state: int = 42
kmeans_init: str = "k-means||"
kmeans_n_init: int | Literal["auto"] = 1
kmeans_oversampling_factor: float = 2.0
kmeans_max_samples_per_batch: int = 32768
# Pairwise similarity parameters
ranking_strategy: RankingStrategy | None = None
pairwise_batch_size: int = 1024
# I/O parameters
input_filetype: Literal["jsonl", "parquet"] = "parquet"
output_filetype: Literal["jsonl", "parquet"] = "parquet"
def run(
self,
streaming_executor: BaseExecutor | tuple[BaseExecutor, BaseExecutor, BaseExecutor] | None = None,
batch_executor: BaseExecutor | None = None,
) -> WorkflowRunResult: ...
Import
from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| input_path | list[str] | Yes | Path(s) to input files containing text data |
| output_path | str |
Yes | Directory to write deduplicated output or duplicate IDs |
| cache_path | None | No | Directory for intermediate results (embeddings, kmeans, pairwise); defaults to output_path |
| perform_removal | bool |
No (default: True) | Whether to perform duplicate removal or just identification |
| text_field | str |
No (default: "text") | Name of the text field in input data |
| model_identifier | str |
No (default: "sentence-transformers/all-MiniLM-L6-v2") | HuggingFace model for embedding generation |
| n_clusters | int |
No (default: 100) | Number of K-means clusters |
| distance_metric | Literal["cosine", "l2"] |
No (default: "cosine") | Distance metric for similarity computation |
| which_to_keep | Literal["hard", "easy", "random"] |
No (default: "hard") | Strategy for ranking documents within clusters |
| eps | None | No (default: 0.01) | Epsilon threshold for duplicate identification; None skips identification |
| input_filetype | Literal["jsonl", "parquet"] |
No (default: "parquet") | Format of input files |
Outputs
| Name | Type | Description |
|---|---|---|
| WorkflowRunResult | WorkflowRunResult |
Result object containing pipeline tasks, timing metadata, duplicate counts, and output paths |
| deduplicated output | Parquet/JSONL files | Deduplicated dataset written to output_path/deduplicated/ (when perform_removal=True)
|
| duplicates output | Parquet files | Identified duplicate IDs written to output_path/duplicates/ (when eps is set)
|
| embeddings cache | Parquet files | Computed embeddings written to cache_path/embeddings/
|
Workflow Phases
| Phase | Component | Executor | Description |
|---|---|---|---|
| 1 | Embedding Generation | streaming_executor (or embedding_executor) | Reads input, generates embeddings, writes to cache |
| 2a | K-means Clustering | batch_executor (RayActorPoolExecutor) | Clusters embeddings into n_clusters groups |
| 2b | Pairwise Similarity | streaming_executor (or pairwise_executor) | Computes pairwise similarity within clusters, identifies duplicates |
| 3 | Duplicate Removal | streaming_executor (or removal_executor) | Removes identified duplicates from original dataset (optional) |
Directory Structure
output_path/
duplicates/ # Identified duplicate IDs (when eps is set)
deduplicated/ # Deduplicated output (when perform_removal=True)
cache_path/
embeddings/ # Cached embedding Parquet files
semantic_dedup/ # K-means and pairwise intermediate results
Usage Examples
Basic Usage
from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow
# Create and run the workflow with default executors
workflow = TextSemanticDeduplicationWorkflow(
input_path="/data/input",
output_path="/data/output",
cache_path="/data/cache",
text_field="text",
n_clusters=100,
eps=0.01,
perform_removal=True,
)
result = workflow.run()
print(f"Total time: {result.metadata['total_time']:.2f}s")
print(f"Duplicates found: {result.metadata['num_duplicates']}")
Custom Executors
from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor
# Use different executors for different phases
embedding_exec = XennaExecutor(num_workers=8)
pairwise_exec = XennaExecutor(num_workers=4)
removal_exec = XennaExecutor(num_workers=8)
workflow = TextSemanticDeduplicationWorkflow(
input_path="/data/input",
output_path="/data/output",
model_identifier="sentence-transformers/all-MiniLM-L6-v2",
n_clusters=500,
distance_metric="cosine",
which_to_keep="hard",
eps=0.05,
kmeans_max_iter=500,
pairwise_batch_size=2048,
)
result = workflow.run(
streaming_executor=(embedding_exec, pairwise_exec, removal_exec),
batch_executor=RayActorPoolExecutor(),
)
Identification Only (No Removal)
from nemo_curator.stages.text.deduplication.semantic import TextSemanticDeduplicationWorkflow
# Only identify duplicates without removing them
workflow = TextSemanticDeduplicationWorkflow(
input_path="/data/input",
output_path="/data/output",
perform_removal=False,
eps=0.01,
)
result = workflow.run()
# Duplicate IDs are written to /data/output/duplicates/