Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Huggingface Datatrove MinhashDedupFilter

From Leeroopedia
Metadata
Knowledge Sources
Domains
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete pipeline step that filters out duplicate documents from the document stream using pre-computed removal lists. This is the fourth and final stage of the Datatrove MinHash deduplication pipeline. It reads .remove files produced by the clustering stage and yields only documents whose sequential index does not appear in the removal list, keeping exactly one representative per duplicate cluster.

Description

MinhashDedupFilter extends PipelineStep and implements a streaming filter:

  1. Load removal list: Opens {rank:06d}.remove from input_folder and reads document IDs one at a time using sequential struct.unpack calls.
  2. Stream and filter: Iterates through the input DocumentsPipeline. For each document, compares its sequential index against the next ID in the removal list. Matching documents are dropped (counted as StatHints.dropped); non-matching documents are yielded (counted as StatHints.forwarded).
  3. Exclusion writing: If an exclusion_writer is provided, dropped documents are written to it instead of being silently discarded.
  4. Metadata loading: If load_cluster_ids=True, cluster IDs are loaded from .clusters files and attached to doc.metadata["minhash_cluster_id"]. Documents not in any cluster receive a cluster ID of -1. Similarly, load_cluster_sizes=True loads cluster sizes into doc.metadata["minhash_cluster_size"] (defaulting to 1 for unclustered documents).
  5. Missing file handling: If no .remove file exists for the current rank, all documents are passed through with a warning.

Usage

from datatrove.pipeline.dedup import MinhashDedupFilter

filter_step = MinhashDedupFilter(
    input_folder="s3://my-bucket/minhash/remove",
)

# With exclusion writing and metadata
from datatrove.pipeline.writers import JsonlWriter

filter_step = MinhashDedupFilter(
    input_folder="s3://my-bucket/minhash/remove",
    exclusion_writer=JsonlWriter("s3://my-bucket/minhash/excluded"),
    load_cluster_ids=True,
    load_cluster_sizes=True,
)

Code Reference

Source Location

  • Repository: huggingface/datatrove
  • File: src/datatrove/pipeline/dedup/minhash.py (lines 599--688)

Signature

class MinhashDedupFilter(PipelineStep):
    def __init__(
        self,
        input_folder: DataFolderLike,
        exclusion_writer: DiskWriter = None,
        load_cluster_ids: bool = False,
        load_cluster_sizes: bool = False,
        lines_to_buffer: int = 5,
    ):

Import

from datatrove.pipeline.dedup import MinhashDedupFilter

I/O Contract

Inputs

Input Contract
Name Type Description
data DocumentsPipeline Iterator of Document objects in the same order as the original signature computation. Sequential document indices must match the IDs used during clustering.
Binary .remove files Binary files Read from input_folder/{rank:06d}.remove. Each record is a packed <I (32-bit unsigned) document ID to remove.
Binary .clusters files Binary files (optional) Read from input_folder/{rank:06d}.clusters when load_cluster_ids=True. Each record contains (doc_id, cluster_id).
Binary .sizes files Binary files (optional) Read from input_folder/{rank:06d}.sizes when load_cluster_sizes=True. Each record contains (doc_id, cluster_size).
rank int Worker rank, used to determine which .remove file to load.

Outputs

Output Contract
Name Type Description
Filtered DocumentsPipeline Generator[Document] Yields documents not found in the removal list. Each cluster retains exactly one document (the cluster root). Documents may have minhash_cluster_id and minhash_cluster_size metadata attached.
Excluded documents (optional) Written via DiskWriter When exclusion_writer is provided, removed documents are written to the exclusion output for inspection and analysis.

Usage Examples

Example: Complete 4-Stage Pipeline

from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.writers import JsonlWriter
from datatrove.pipeline.dedup import (
    MinhashDedupSignature,
    MinhashDedupBuckets,
    MinhashDedupCluster,
    MinhashDedupFilter,
    MinhashConfig,
)

config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)

# Stage 1: Compute signatures (parallel)
LocalPipelineExecutor(
    pipeline=[
        JsonlReader("data/input/"),
        MinhashDedupSignature(output_folder="data/sigs", config=config),
    ],
    tasks=8,
).run()

# Stage 2: Find duplicate pairs (parallel, 1+ worker per bucket)
LocalPipelineExecutor(
    pipeline=[
        MinhashDedupBuckets(
            input_folder="data/sigs",
            output_folder="data/dups",
            config=config,
        ),
    ],
    tasks=14,
).run()

# Stage 3: Cluster duplicates (single worker)
LocalPipelineExecutor(
    pipeline=[
        MinhashDedupCluster(
            input_folder="data/dups",
            output_folder="data/remove",
            config=config,
        ),
    ],
    tasks=1,
).run()

# Stage 4: Filter documents (parallel)
LocalPipelineExecutor(
    pipeline=[
        JsonlReader("data/input/"),
        MinhashDedupFilter(input_folder="data/remove"),
        JsonlWriter("data/output/"),
    ],
    tasks=8,
).run()
  • Stage 4 re-reads the original input and filters based on the .remove files.
  • Documents are yielded in their original order, minus the duplicates.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment