Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Huggingface Datatrove MinhashDedupCluster

From Leeroopedia
Metadata
Knowledge Sources
Domains
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete pipeline step that clusters duplicate document pairs into connected components using a union-find data structure and emits per-rank removal lists. This is the third of four stages in the Datatrove MinHash deduplication pipeline. It reads all .dups files, builds a global union-find over all matched document pairs, and writes .remove files listing the document IDs that should be filtered out.

Description

MinhashDedupCluster extends PipelineStep and performs the following:

  1. Read all duplicate files: Iterates over all .dups files from the bucket matching stage using read_tuples_from_file with format "4I" (four unsigned 32-bit integers per record).
  2. Union-find construction: For each pair (f1, d1, f2, d2), the union function merges the two nodes (f1, d1) and (f2, d2). The implementation uses a Python dictionary-based union-find with path compression and union by size. The sentinel (SENTINEL, SENTINEL) is always prioritized as root.
  3. Output generation: After processing all pairs, the union set is iterated in sorted order. Nodes whose parent is not themselves are written to {file:06d}.remove files as packed 32-bit unsigned integers.
  4. Optional metadata: If save_cluster_id=True, cluster IDs are written to .clusters files. If save_cluster_size=True, cluster sizes are written to .sizes files.

This step must run with world_size=1 because the union-find requires a global view of all duplicate pairs to compute transitive closure correctly.

Note: A Rust alternative (fast_mh3) exists in src/datatrove/tools/fast_mh3/ that implements the same clustering logic with significantly better performance for large-scale datasets.

Usage

from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashConfig

config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)

cluster_step = MinhashDedupCluster(
    input_folder="s3://my-bucket/minhash/dups",
    output_folder="s3://my-bucket/minhash/remove",
    config=config,
)

# With optional metadata
cluster_step = MinhashDedupCluster(
    input_folder="s3://my-bucket/minhash/dups",
    output_folder="s3://my-bucket/minhash/remove",
    config=config,
    save_cluster_id=True,
    save_cluster_size=True,
    ignore_index_matches=False,
)

Code Reference

Source Location

  • Repository: huggingface/datatrove
  • File: src/datatrove/pipeline/dedup/minhash.py (lines 500--597)

Signature

class MinhashDedupCluster(PipelineStep):
    def __init__(
        self,
        input_folder: DataFolderLike,
        output_folder: DataFolderLike,
        config: MinhashConfig = None,
        save_cluster_id: bool = False,
        save_cluster_size: bool = False,
        ignore_index_matches: bool = False,
        lines_to_buffer: int = 5,
    ):

Import

from datatrove.pipeline.dedup import MinhashDedupCluster

I/O Contract

Inputs

Input Contract
Name Type Description
data None This step does not accept a DocumentsPipeline input.
Binary .dups files Binary files Read from input_folder matching glob *.dups. Each record is a packed <4I tuple: (file_id1, doc_id1, file_id2, doc_id2). Sentinel values (0xFFFFFFFF, 0xFFFFFFFF) indicate index matches.
world_size int Must be exactly 1. An assertion enforces single-worker execution.

Outputs

Output Contract
Name Type Description
Binary .remove files Binary files Written to output_folder/{file:06d}.remove. Each record is a packed <I (32-bit unsigned) document ID that should be removed during filtering.
Binary .clusters files Binary files (optional) Written when save_cluster_id=True. Each record is two packed <I values: (doc_id, cluster_id).
Binary .sizes files Binary files (optional) Written when save_cluster_size=True. Each record is two packed <I values: (doc_id, cluster_size).

Usage Examples

Example: Single-Worker Clustering

from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashConfig

config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)

executor = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupCluster(
            input_folder="data/minhash/dups",
            output_folder="data/minhash/remove",
            config=config,
            save_cluster_id=True,
            save_cluster_size=True,
        ),
    ],
    tasks=1,  # MUST be 1 for clustering
)
executor.run()
  • Produces 000000.remove, 000001.remove, etc. -- one per original input rank.
  • With save_cluster_id=True, also produces 000000.clusters, 000001.clusters, etc.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment