Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Huggingface Datatrove MinhashDedupBuckets

From Leeroopedia
Metadata
Knowledge Sources
Domains
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete pipeline step that performs heap-based k-way merge matching across sorted MinHash signature files within each LSH bucket to find candidate duplicate document pairs. This is the second of four stages in the Datatrove MinHash deduplication pipeline. It reads sorted .minhash.sig files, merges them using a priority queue, and emits (file_id1, doc_id1, file_id2, doc_id2) tuples for all pairs sharing identical band signatures.

Description

MinhashDedupBuckets extends PipelineStep and implements the bucket matching logic:

  1. Worker assignment: Each worker is assigned to a specific bucket (and optionally a hash range within that bucket). The world_size must be divisible by num_buckets.
  2. Priority queue initialization: All .minhash.sig files for the assigned bucket are opened and their first records are pushed into a min-heap. Each record is wrapped in a HashSig dataclass that supports comparison.
  3. Merge-scan: Records are popped in sorted order. When consecutive records share identical signatures, a duplicate pair is emitted to the output file.
  4. Index handling: If an index_folder is provided, index files are included in the merge. Matches against index entries use a sentinel value (SENTINEL, SENTINEL) for the index side of the pair. When only_dedup_in_index=True, matches between two non-index documents are ignored.
  5. Index creation: If create_index_name is set, new unique signatures (those not already in any index) are written as a .minhash.index file for future incremental dedup.

The stage includes assertions to verify sort order invariants throughout the merge process.

Usage

from datatrove.pipeline.dedup import MinhashDedupBuckets, MinhashConfig

config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)

buckets_step = MinhashDedupBuckets(
    input_folder="s3://my-bucket/minhash/sigs",
    output_folder="s3://my-bucket/minhash/dups",
    config=config,
)

# With index support for incremental dedup
buckets_step = MinhashDedupBuckets(
    input_folder="s3://my-bucket/minhash/sigs",
    output_folder="s3://my-bucket/minhash/dups",
    index_folder="s3://my-bucket/minhash/index",
    config=config,
    only_dedup_in_index=True,
    create_index_name="batch_002",
)

Code Reference

Source Location

  • Repository: huggingface/datatrove
  • File: src/datatrove/pipeline/dedup/minhash.py (lines 324--497)

Signature

class MinhashDedupBuckets(PipelineStep):
    def __init__(
        self,
        input_folder: DataFolderLike,
        output_folder: DataFolderLike,
        index_folder: DataFolderLike = None,
        config: MinhashConfig = None,
        only_dedup_in_index: bool = True,
        create_index_name: str = None,
        lines_to_buffer: int = 5,
    ):

Import

from datatrove.pipeline.dedup import MinhashDedupBuckets

I/O Contract

Inputs

Input Contract
Name Type Description
data None This step does not accept a DocumentsPipeline input. An assertion enforces that data is None.
Sorted .minhash.sig files Binary files Read from input_folder/bucket_{bi:03d}/. Each file contains records of hashes_per_bucket hash values + a 32-bit doc index, sorted by signature.
rank int Worker rank, used to determine which bucket and hash range to process.
world_size int Total workers. Must be divisible by num_buckets.

Outputs

Output Contract
Name Type Description
Binary .dups files Binary files Written to output_folder/{bucket:05d}_{bucket_worker:02d}.dups. Each record is a packed <4I tuple: (file_id1, doc_id1, file_id2, doc_id2). Index matches use sentinel value (0xFFFFFFFF, 0xFFFFFFFF) for the index side.
Binary .minhash.index files Binary files (optional) Written to index_folder/bucket_{bi:03d}/{name}_{worker:02d}.minhash.index when create_index_name is set. Contains unique band signatures not present in any existing index.

Usage Examples

Example: Distributed Bucket Matching

from datatrove.executor import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupBuckets, MinhashConfig

config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)

# 14 tasks = 1 worker per bucket (minimum)
executor = SlurmPipelineExecutor(
    pipeline=[
        MinhashDedupBuckets(
            input_folder="s3://my-bucket/minhash/sigs",
            output_folder="s3://my-bucket/minhash/dups",
            config=config,
        ),
    ],
    tasks=14,  # must be divisible by num_buckets (14)
    job_name="minhash-buckets",
)
executor.run()
  • With tasks=28, each bucket gets 2 workers that partition the hash range.
  • Output files: 00000_00.dups through 00013_01.dups.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment