Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Recommenders team Recommenders Load Spark Df

From Leeroopedia


Knowledge Sources
Domains Data Engineering, Distributed Computing
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for downloading, parsing, and loading MovieLens datasets into PySpark DataFrames with automatic schema handling, multi-character delimiter support, and optional item metadata enrichment.

Description

The load_spark_df function encapsulates the entire pipeline of acquiring a MovieLens dataset and converting it into a distributed Spark DataFrame. It supports five dataset sizes ("100k", "1m", "10m", "20m", "mock100") and transparently handles the differences in file formats across sizes. For datasets with multi-character delimiters (1M, 10M), it falls back to RDD-based text parsing since PySpark's CSV reader only supports single-character separators. Item metadata (titles, genres, release year) is optionally joined from a separate file loaded via pandas on the driver node. The function caches and materializes the result before returning, ensuring safe cleanup of temporary download files.

Usage

Call this function immediately after obtaining a SparkSession. Pass the returned DataFrame to splitting functions and model training. The header parameter controls column naming, while schema allows explicit type specification. For Databricks environments, pass the dbutils parameter to enable DBFS file operations.

Code Reference

Source Location

  • Repository: recommenders
  • File: recommenders/datasets/movielens.py (Lines 356-494)

Signature

def load_spark_df(
    spark,
    size="100k",
    header=None,
    schema=None,
    local_cache_path=None,
    dbutils=None,
    title_col=None,
    genres_col=None,
    year_col=None,
) -> pyspark.sql.DataFrame

Import

from recommenders.datasets.movielens import load_spark_df

I/O Contract

Inputs

Name Type Required Description
spark pyspark.sql.SparkSession Yes Active Spark session obtained from start_or_get_spark or equivalent
size str No (default: "100k") Dataset size; one of "100k", "1m", "10m", "20m", "mock100"
header list or tuple No (default: None) Column names for the rating data; ignored if schema is provided
schema pyspark.sql.types.StructType No (default: None) Explicit PySpark schema with column names and types
local_cache_path str No (default: None) Directory or zip path for caching; uses a temp directory if None
dbutils object No (default: None) Databricks dbutils object for DBFS file operations; required on Databricks
title_col str No (default: None) Column name for movie title; if None, titles are not loaded
genres_col str No (default: None) Column name for genres (pipe-separated string); if None, genres are not loaded
year_col str No (default: None) Column name for release year; if None, year is not loaded

Outputs

Name Type Description
df pyspark.sql.DataFrame Cached Spark DataFrame containing rating data with columns determined by header/schema, optionally enriched with title, genres, and year columns

Usage Examples

Basic Usage with Default Schema

from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.datasets.movielens import load_spark_df

spark = start_or_get_spark(app_name="ALS_Example", memory="16g")

# Load MovieLens 100K with default column names
df = load_spark_df(spark, size="100k")
df.show(5)
# +------+------+------+---------+
# |userID|itemID|rating|timestamp|
# +------+------+------+---------+
# |   196|   242|   3.0|881250949|
# ...

Custom Header and Item Metadata

# Load MovieLens 1M with custom column names and movie titles
df = load_spark_df(
    spark,
    size="1m",
    header=("UserId", "ItemId", "Rating", "Timestamp"),
    title_col="Title",
    genres_col="Genres",
    year_col="Year",
)
df.printSchema()
# root
#  |-- UserId: integer
#  |-- ItemId: integer
#  |-- Rating: float
#  |-- Timestamp: long
#  |-- Title: string
#  |-- Genres: string
#  |-- Year: integer

With Explicit Schema

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, LongType

schema = StructType([
    StructField("userID", IntegerType()),
    StructField("itemID", IntegerType()),
    StructField("rating", FloatType()),
    StructField("timestamp", LongType()),
])
df = load_spark_df(spark, size="1m", schema=schema)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment