Implementation:Huggingface Datatrove MinhashDedupCluster
| Knowledge Sources | |
|---|---|
| Domains | |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete pipeline step that clusters duplicate document pairs into connected components using a union-find data structure and emits per-rank removal lists. This is the third of four stages in the Datatrove MinHash deduplication pipeline. It reads all .dups files, builds a global union-find over all matched document pairs, and writes .remove files listing the document IDs that should be filtered out.
Description
MinhashDedupCluster extends PipelineStep and performs the following:
- Read all duplicate files: Iterates over all
.dupsfiles from the bucket matching stage usingread_tuples_from_filewith format"4I"(four unsigned 32-bit integers per record). - Union-find construction: For each pair
(f1, d1, f2, d2), theunionfunction merges the two nodes(f1, d1)and(f2, d2). The implementation uses a Python dictionary-based union-find with path compression and union by size. The sentinel(SENTINEL, SENTINEL)is always prioritized as root. - Output generation: After processing all pairs, the union set is iterated in sorted order. Nodes whose parent is not themselves are written to
{file:06d}.removefiles as packed 32-bit unsigned integers. - Optional metadata: If
save_cluster_id=True, cluster IDs are written to.clustersfiles. Ifsave_cluster_size=True, cluster sizes are written to.sizesfiles.
This step must run with world_size=1 because the union-find requires a global view of all duplicate pairs to compute transitive closure correctly.
Note: A Rust alternative (fast_mh3) exists in src/datatrove/tools/fast_mh3/ that implements the same clustering logic with significantly better performance for large-scale datasets.
Usage
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashConfig
config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)
cluster_step = MinhashDedupCluster(
input_folder="s3://my-bucket/minhash/dups",
output_folder="s3://my-bucket/minhash/remove",
config=config,
)
# With optional metadata
cluster_step = MinhashDedupCluster(
input_folder="s3://my-bucket/minhash/dups",
output_folder="s3://my-bucket/minhash/remove",
config=config,
save_cluster_id=True,
save_cluster_size=True,
ignore_index_matches=False,
)
Code Reference
Source Location
- Repository:
huggingface/datatrove - File:
src/datatrove/pipeline/dedup/minhash.py(lines 500--597)
Signature
class MinhashDedupCluster(PipelineStep):
def __init__(
self,
input_folder: DataFolderLike,
output_folder: DataFolderLike,
config: MinhashConfig = None,
save_cluster_id: bool = False,
save_cluster_size: bool = False,
ignore_index_matches: bool = False,
lines_to_buffer: int = 5,
):
Import
from datatrove.pipeline.dedup import MinhashDedupCluster
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
data |
None | This step does not accept a DocumentsPipeline input.
|
Binary .dups files |
Binary files | Read from input_folder matching glob *.dups. Each record is a packed <4I tuple: (file_id1, doc_id1, file_id2, doc_id2). Sentinel values (0xFFFFFFFF, 0xFFFFFFFF) indicate index matches.
|
world_size |
int | Must be exactly 1. An assertion enforces single-worker execution.
|
Outputs
| Name | Type | Description |
|---|---|---|
Binary .remove files |
Binary files | Written to output_folder/{file:06d}.remove. Each record is a packed <I (32-bit unsigned) document ID that should be removed during filtering.
|
Binary .clusters files |
Binary files (optional) | Written when save_cluster_id=True. Each record is two packed <I values: (doc_id, cluster_id).
|
Binary .sizes files |
Binary files (optional) | Written when save_cluster_size=True. Each record is two packed <I values: (doc_id, cluster_size).
|
Usage Examples
Example: Single-Worker Clustering
from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashConfig
config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)
executor = LocalPipelineExecutor(
pipeline=[
MinhashDedupCluster(
input_folder="data/minhash/dups",
output_folder="data/minhash/remove",
config=config,
save_cluster_id=True,
save_cluster_size=True,
),
],
tasks=1, # MUST be 1 for clustering
)
executor.run()
- Produces
000000.remove,000001.remove, etc. -- one per original input rank. - With
save_cluster_id=True, also produces000000.clusters,000001.clusters, etc.