Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datasets Spark Builder

From Leeroopedia
Revision as of 13:00, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Huggingface_Datasets_Spark_Builder.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Source src/datasets/packaged_modules/spark/spark.py (lines 113-367)
Domain(s) Data_Loading, Distributed_Computing
Last Updated 2026-02-14

Overview

Description

Spark is a packaged dataset builder (subclass of DatasetBuilder) in the HuggingFace Datasets library that converts PySpark DataFrames into HuggingFace Datasets. It bridges the gap between distributed Spark-based data processing pipelines and the HuggingFace ecosystem, enabling users to materialize Spark DataFrames as Arrow-backed datasets or stream them as IterableDataset instances.

The builder operates in two modes:

  • Materialized mode (_prepare_split) -- Writes the Spark DataFrame to Arrow or Parquet shard files using Spark's mapInArrow API for distributed writing. It handles automatic repartitioning based on shard size limits, distributed file renaming, and supports both local and remote filesystems.
  • Streaming mode (_get_examples_iterable_for_split) -- Returns a SparkExamplesIterable that lazily iterates over the DataFrame partitions, supporting partition-level shuffling, sharding across workers, and stateful resumption.

The builder validates that the cache directory is accessible from all Spark workers on multi-node clusters by writing a probe file. It uses the DataFrame's semantic hash as the config name for caching purposes.

Usage

Use the Spark builder when you need to convert data processed in a PySpark pipeline into HuggingFace Datasets format. Common scenarios include:

  • Converting large-scale ETL pipeline outputs from Spark into datasets for model training.
  • Streaming Spark DataFrames directly into training loops without full materialization.
  • Exporting Spark data to Parquet or Arrow shards with automatic size-based partitioning.

Code Reference

Source Location

Repository: huggingface/datasets

File: src/datasets/packaged_modules/spark/spark.py (lines 113-367)

Signature

@dataclass
class SparkConfig(datasets.BuilderConfig):
    features: Optional[datasets.Features] = None


class Spark(datasets.DatasetBuilder):
    BUILDER_CONFIG_CLASS = SparkConfig

    def __init__(
        self,
        df: "pyspark.sql.DataFrame",
        cache_dir: str = None,
        working_dir: str = None,
        **config_kwargs,
    ):
        ...

Key Methods:

  • __init__(self, df, cache_dir=None, working_dir=None, **config_kwargs) -- Initializes the builder with a PySpark DataFrame. Obtains or creates a SparkSession and uses the DataFrame's semantic hash as the config name.
  • _info(self) -> DatasetInfo -- Returns dataset info using features from the config.
  • _split_generators(self, dl_manager) -> list[SplitGenerator] -- Returns a single TRAIN split generator (the DataFrame represents one logical split).
  • _prepare_split(self, split_generator, file_format="arrow", max_shard_size=None, num_proc=None, **kwargs) -- Validates cache directory accessibility, repartitions the DataFrame if needed, writes shards in parallel via mapInArrow, and renames output files into the final naming pattern.
  • _get_examples_iterable_for_split(self, split_generator) -> SparkExamplesIterable -- Returns a SparkExamplesIterable for lazy, streaming iteration over the DataFrame.

Supporting Classes:

  • SparkExamplesIterable -- A _BaseExamplesIterable subclass that iterates DataFrame partitions with support for shuffling, sharding, and stateful resumption.

Import

# Not imported directly; used via Dataset.from_spark or load_dataset
from datasets import Dataset

# From a Spark DataFrame
dataset = Dataset.from_spark(spark_df)

# Or via load_dataset
from datasets import load_dataset
ds = load_dataset("spark", df=spark_df)

I/O Contract

Inputs

Parameter Type Description
df pyspark.sql.DataFrame The PySpark DataFrame to convert. Required.
cache_dir Optional[str] (default: None) Directory for caching the materialized dataset. Must be NFS-accessible on multi-node clusters.
working_dir Optional[str] (default: None) Temporary working directory for shard writing. Files are moved to cache_dir on completion.
features Optional[datasets.Features] (default: None) Explicit feature schema via SparkConfig.
file_format str (default: "arrow") Output format: "arrow" or "parquet".
max_shard_size Optional[Union[str, int]] Maximum shard size (e.g., "500MB"). The DataFrame is automatically repartitioned if the estimated total size exceeds this limit.

Outputs

Output Type Description
Materialized dataset Dataset An Arrow-backed HuggingFace Dataset written to shard files.
Streamed dataset IterableDataset A lazily-evaluated iterable dataset backed by the Spark DataFrame.
Shard files Arrow or Parquet files Written to cache_dir with the naming pattern {name}-{split}-{shard_id:05d}-of-{total:05d}.{format}.

Usage Examples

Converting a Spark DataFrame to a Dataset

from pyspark.sql import SparkSession
from datasets import Dataset

spark = SparkSession.builder.getOrCreate()
spark_df = spark.read.parquet("s3://bucket/data/")

# Materialize as a HuggingFace Dataset
dataset = Dataset.from_spark(spark_df, cache_dir="/shared/nfs/cache")
print(dataset)
print(dataset[0])

Streaming a Spark DataFrame

from pyspark.sql import SparkSession
from datasets import IterableDataset

spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(
    [(1, "hello"), (2, "world")],
    ["id", "text"]
)

# Stream as an IterableDataset (no materialization)
iterable_ds = IterableDataset.from_spark(spark_df)
for example in iterable_ds:
    print(example)

Using load_dataset with Spark

from pyspark.sql import SparkSession
from datasets import load_dataset

spark = SparkSession.builder.getOrCreate()
spark_df = spark.read.json("data/*.json")

ds = load_dataset("spark", df=spark_df, cache_dir="/tmp/hf_cache")
print(ds["train"].features)

Related Pages

Principles

  • Spark Dataset Building -- Principle for converting PySpark DataFrames to HuggingFace Datasets with distributed writing and streaming support.

Environments

Related Implementations

  • DatasetBuilder -- The base class providing the dataset building, caching, and split management infrastructure.
  • SparkExamplesIterable -- The iterable implementation that enables streaming from Spark DataFrames with partition-level shuffling and stateful resumption.
  • ArrowWriter / ParquetWriter -- Writer classes used during shard materialization.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment