Principle:Recommenders team Recommenders Data Loading MovieLens Spark
| Knowledge Sources | |
|---|---|
| Domains | Data Engineering, Distributed Computing, Recommendation Systems |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Loading benchmark datasets into distributed Spark DataFrames for scalable recommendation processing, handling schema inference, multi-character delimiters, and Databricks DBFS integration.
Description
Recommendation system development requires loading user-item interaction data into a format suitable for distributed model training and evaluation. The MovieLens dataset family (100K, 1M, 10M, 20M) is the standard benchmark for collaborative filtering research. Loading these datasets into Spark involves several non-trivial challenges:
- Schema Management: Different dataset sizes use different file formats and column layouts. The 100K dataset uses tab-separated values while the 1M and 10M datasets use
::as a multi-character delimiter. A flexible schema mechanism must map raw columns to standardized names (userID, itemID, rating, timestamp). - Multi-Character Delimiter Handling: PySpark's built-in CSV reader does not support multi-character delimiters like
::. For these datasets, the data must be loaded viaSparkContext.textFile()as raw text, split manually with amap()transformation on the RDD, and then converted into a DataFrame with an explicit schema. - Item Metadata Enrichment: Movie titles, genres, and release year information reside in a separate file. Since this metadata file is small, it is loaded as a pandas DataFrame on the driver node and then converted to a Spark DataFrame for a distributed join with the ratings data.
- Databricks DBFS Integration: When running on Databricks, data files must be copied from the local filesystem to DBFS (Databricks File System) before PySpark can read them. This requires the
dbutilsobject for file operations. - Caching: The resulting DataFrame is cached and materialized (via
df.count()) before returning, ensuring that the temporary download files can be safely cleaned up without invalidating the lazy computation graph.
Usage
Use this data loading pattern at the beginning of any Spark-based recommendation workflow. It is the prerequisite for data splitting, ALS model training, and distributed evaluation. The same function supports all MovieLens dataset sizes, making it easy to prototype on 100K and scale to 20M.
Theoretical Basis
The MovieLens dataset represents a partially observed user-item interaction matrix R of shape (m x n):
R[u, i] = rating that user u gave to item i (if observed)
= ? (if not yet observed)
Dataset sizes and their characteristics:
100K: 943 users x 1,682 items, 100,000 ratings (density ~6.3%)
1M: 6,040 users x 3,706 items, 1,000,209 ratings (density ~4.5%)
10M: 69,878 users x 10,677 items, 10,000,054 ratings (density ~1.3%)
20M: 138,493 users x 26,744 items, 20,000,263 ratings (density ~0.5%)
The loading pipeline in pseudocode:
1. Resolve dataset format (separator, has_header) from size parameter
2. Download and extract zip archive if not cached locally
3. IF multi-character separator (e.g. "::"):
raw_rdd = spark.textFile(path)
data_rdd = raw_rdd.map(line -> line.split(separator))
df = spark.createDataFrame(data_rdd, schema)
ELSE:
df = spark.read.csv(path, schema=schema, sep=separator)
4. IF title_col or genres_col or year_col requested:
item_pdf = pd.read_csv(item_path) # small file, load on driver
item_df = spark.createDataFrame(item_pdf)
df = df.join(item_df, on=movie_col, how="left")
5. df.cache()
6. df.count() # materialize before temp files are cleaned up
7. RETURN df
The caching step (5-6) is essential because Spark uses lazy evaluation. Without materializing the DataFrame before the context manager cleans up temporary files, subsequent actions on the DataFrame would fail with file-not-found errors.