Implementation:Huggingface Datatrove MinhashDedupBuckets
| Knowledge Sources | |
|---|---|
| Domains | |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete pipeline step that performs heap-based k-way merge matching across sorted MinHash signature files within each LSH bucket to find candidate duplicate document pairs. This is the second of four stages in the Datatrove MinHash deduplication pipeline. It reads sorted .minhash.sig files, merges them using a priority queue, and emits (file_id1, doc_id1, file_id2, doc_id2) tuples for all pairs sharing identical band signatures.
Description
MinhashDedupBuckets extends PipelineStep and implements the bucket matching logic:
- Worker assignment: Each worker is assigned to a specific bucket (and optionally a hash range within that bucket). The
world_sizemust be divisible bynum_buckets. - Priority queue initialization: All
.minhash.sigfiles for the assigned bucket are opened and their first records are pushed into a min-heap. Each record is wrapped in aHashSigdataclass that supports comparison. - Merge-scan: Records are popped in sorted order. When consecutive records share identical signatures, a duplicate pair is emitted to the output file.
- Index handling: If an
index_folderis provided, index files are included in the merge. Matches against index entries use a sentinel value(SENTINEL, SENTINEL)for the index side of the pair. Whenonly_dedup_in_index=True, matches between two non-index documents are ignored. - Index creation: If
create_index_nameis set, new unique signatures (those not already in any index) are written as a.minhash.indexfile for future incremental dedup.
The stage includes assertions to verify sort order invariants throughout the merge process.
Usage
from datatrove.pipeline.dedup import MinhashDedupBuckets, MinhashConfig
config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)
buckets_step = MinhashDedupBuckets(
input_folder="s3://my-bucket/minhash/sigs",
output_folder="s3://my-bucket/minhash/dups",
config=config,
)
# With index support for incremental dedup
buckets_step = MinhashDedupBuckets(
input_folder="s3://my-bucket/minhash/sigs",
output_folder="s3://my-bucket/minhash/dups",
index_folder="s3://my-bucket/minhash/index",
config=config,
only_dedup_in_index=True,
create_index_name="batch_002",
)
Code Reference
Source Location
- Repository:
huggingface/datatrove - File:
src/datatrove/pipeline/dedup/minhash.py(lines 324--497)
Signature
class MinhashDedupBuckets(PipelineStep):
def __init__(
self,
input_folder: DataFolderLike,
output_folder: DataFolderLike,
index_folder: DataFolderLike = None,
config: MinhashConfig = None,
only_dedup_in_index: bool = True,
create_index_name: str = None,
lines_to_buffer: int = 5,
):
Import
from datatrove.pipeline.dedup import MinhashDedupBuckets
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
data |
None | This step does not accept a DocumentsPipeline input. An assertion enforces that data is None.
|
Sorted .minhash.sig files |
Binary files | Read from input_folder/bucket_{bi:03d}/. Each file contains records of hashes_per_bucket hash values + a 32-bit doc index, sorted by signature.
|
rank |
int | Worker rank, used to determine which bucket and hash range to process. |
world_size |
int | Total workers. Must be divisible by num_buckets.
|
Outputs
| Name | Type | Description |
|---|---|---|
Binary .dups files |
Binary files | Written to output_folder/{bucket:05d}_{bucket_worker:02d}.dups. Each record is a packed <4I tuple: (file_id1, doc_id1, file_id2, doc_id2). Index matches use sentinel value (0xFFFFFFFF, 0xFFFFFFFF) for the index side.
|
Binary .minhash.index files |
Binary files (optional) | Written to index_folder/bucket_{bi:03d}/{name}_{worker:02d}.minhash.index when create_index_name is set. Contains unique band signatures not present in any existing index.
|
Usage Examples
Example: Distributed Bucket Matching
from datatrove.executor import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupBuckets, MinhashConfig
config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)
# 14 tasks = 1 worker per bucket (minimum)
executor = SlurmPipelineExecutor(
pipeline=[
MinhashDedupBuckets(
input_folder="s3://my-bucket/minhash/sigs",
output_folder="s3://my-bucket/minhash/dups",
config=config,
),
],
tasks=14, # must be divisible by num_buckets (14)
job_name="minhash-buckets",
)
executor.run()
- With
tasks=28, each bucket gets 2 workers that partition the hash range. - Output files:
00000_00.dupsthrough00013_01.dups.