Implementation:Recommenders team Recommenders Load Spark Df
| Knowledge Sources | |
|---|---|
| Domains | Data Engineering, Distributed Computing |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for downloading, parsing, and loading MovieLens datasets into PySpark DataFrames with automatic schema handling, multi-character delimiter support, and optional item metadata enrichment.
Description
The load_spark_df function encapsulates the entire pipeline of acquiring a MovieLens dataset and converting it into a distributed Spark DataFrame. It supports five dataset sizes ("100k", "1m", "10m", "20m", "mock100") and transparently handles the differences in file formats across sizes. For datasets with multi-character delimiters (1M, 10M), it falls back to RDD-based text parsing since PySpark's CSV reader only supports single-character separators. Item metadata (titles, genres, release year) is optionally joined from a separate file loaded via pandas on the driver node. The function caches and materializes the result before returning, ensuring safe cleanup of temporary download files.
Usage
Call this function immediately after obtaining a SparkSession. Pass the returned DataFrame to splitting functions and model training. The header parameter controls column naming, while schema allows explicit type specification. For Databricks environments, pass the dbutils parameter to enable DBFS file operations.
Code Reference
Source Location
- Repository: recommenders
- File: recommenders/datasets/movielens.py (Lines 356-494)
Signature
def load_spark_df(
spark,
size="100k",
header=None,
schema=None,
local_cache_path=None,
dbutils=None,
title_col=None,
genres_col=None,
year_col=None,
) -> pyspark.sql.DataFrame
Import
from recommenders.datasets.movielens import load_spark_df
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| spark | pyspark.sql.SparkSession | Yes | Active Spark session obtained from start_or_get_spark or equivalent
|
| size | str | No (default: "100k") | Dataset size; one of "100k", "1m", "10m", "20m", "mock100"
|
| header | list or tuple | No (default: None) | Column names for the rating data; ignored if schema is provided
|
| schema | pyspark.sql.types.StructType | No (default: None) | Explicit PySpark schema with column names and types |
| local_cache_path | str | No (default: None) | Directory or zip path for caching; uses a temp directory if None |
| dbutils | object | No (default: None) | Databricks dbutils object for DBFS file operations; required on Databricks
|
| title_col | str | No (default: None) | Column name for movie title; if None, titles are not loaded |
| genres_col | str | No (default: None) | Column name for genres (pipe-separated string); if None, genres are not loaded |
| year_col | str | No (default: None) | Column name for release year; if None, year is not loaded |
Outputs
| Name | Type | Description |
|---|---|---|
| df | pyspark.sql.DataFrame | Cached Spark DataFrame containing rating data with columns determined by header/schema, optionally enriched with title, genres, and year columns
|
Usage Examples
Basic Usage with Default Schema
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.datasets.movielens import load_spark_df
spark = start_or_get_spark(app_name="ALS_Example", memory="16g")
# Load MovieLens 100K with default column names
df = load_spark_df(spark, size="100k")
df.show(5)
# +------+------+------+---------+
# |userID|itemID|rating|timestamp|
# +------+------+------+---------+
# | 196| 242| 3.0|881250949|
# ...
Custom Header and Item Metadata
# Load MovieLens 1M with custom column names and movie titles
df = load_spark_df(
spark,
size="1m",
header=("UserId", "ItemId", "Rating", "Timestamp"),
title_col="Title",
genres_col="Genres",
year_col="Year",
)
df.printSchema()
# root
# |-- UserId: integer
# |-- ItemId: integer
# |-- Rating: float
# |-- Timestamp: long
# |-- Title: string
# |-- Genres: string
# |-- Year: integer
With Explicit Schema
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, LongType
schema = StructType([
StructField("userID", IntegerType()),
StructField("itemID", IntegerType()),
StructField("rating", FloatType()),
StructField("timestamp", LongType()),
])
df = load_spark_df(spark, size="1m", schema=schema)