Implementation:Recommenders team Recommenders ALSModel Transform
| Knowledge Sources | |
|---|---|
| Domains | Recommendation Systems, Matrix Factorization, Distributed Computing |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for generating rating predictions from a trained ALS model by computing dot products of user and item latent factor vectors across distributed Spark partitions.
Description
This is a wrapper document for PySpark's external ALSModel.transform() API, documenting how it is used within the Recommenders repository's ALS workflow. The transform() method is the standard Spark ML Transformer interface: given a DataFrame containing user and item columns, it appends a prediction column with the estimated rating for each user-item pair.
The method computes predictions by looking up the learned user and item latent factor vectors and computing their dot product. When coldStartStrategy="drop" was set during model training, any user-item pairs involving unknown users or items are dropped from the output rather than producing NaN values. This is essential for clean evaluation metric computation.
Usage
Call model.transform(test_df) after training an ALS model with model = als.fit(train_df). The resulting DataFrame contains all original columns plus a prediction column. Pass this prediction DataFrame along with the ground truth to the evaluation classes (SparkRatingEvaluation or SparkRankingEvaluation) to compute metrics.
Code Reference
Source Location
- Repository: External PySpark API
- Package:
pyspark.ml.recommendation
Signature
# Generate predictions for test user-item pairs
predictions = model.transform(test_df) # Returns pyspark.sql.DataFrame
# Generate top-N recommendations for all users
top_n = model.recommendForAllUsers(numItems=10) # Returns pyspark.sql.DataFrame
Import
from pyspark.ml.recommendation import ALS
# ALSModel is returned by ALS.fit(), not imported directly
External Reference
- Official Documentation: pyspark.ml.recommendation.ALSModel
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| test_df | pyspark.sql.DataFrame | Yes | DataFrame containing at minimum the user and item columns matching those specified during ALS configuration (e.g., userID, itemID); may also contain the true rating column for later evaluation
|
Outputs
| Name | Type | Description |
|---|---|---|
| predictions | pyspark.sql.DataFrame | Input DataFrame with an appended prediction column (float) containing the estimated rating for each user-item pair; rows with cold-start users/items are dropped if coldStartStrategy="drop"
|
Usage Examples
Basic Prediction
from pyspark.ml.recommendation import ALS
# Train model (assumed train_df is already prepared)
als = ALS(
rank=10,
maxIter=15,
regParam=0.05,
userCol="userID",
itemCol="itemID",
ratingCol="rating",
coldStartStrategy="drop",
)
model = als.fit(train_df)
# Generate predictions on test set
predictions = model.transform(test_df)
predictions.show(5)
# +------+------+------+---------+----------+
# |userID|itemID|rating|timestamp|prediction|
# +------+------+------+---------+----------+
# | 148| 496| 4.0|882920947| 3.4215...|
# | 463| 496| 3.0|876768848| 3.1082...|
# ...
Top-N Recommendations
# Get top 10 item recommendations for every user
top_10 = model.recommendForAllUsers(10)
top_10.show(3, truncate=False)
# +------+------------------------------------------------------------+
# |userID|recommendations |
# +------+------------------------------------------------------------+
# |1 |[{454, 4.92}, {312, 4.87}, {98, 4.85}, ...] |
# |2 |[{121, 4.78}, {56, 4.71}, {789, 4.65}, ...] |
# ...
Full Workflow with Evaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.datasets.movielens import load_spark_df
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation
from pyspark.ml.recommendation import ALS
spark = start_or_get_spark(app_name="ALS_Example", memory="16g")
data = load_spark_df(spark, size="100k")
train, test = spark_random_split(data, ratio=0.75, seed=42)
als = ALS(rank=10, maxIter=15, regParam=0.05,
userCol="userID", itemCol="itemID", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(train)
predictions = model.transform(test)
evaluator = SparkRatingEvaluation(test, predictions,
col_rating="rating", col_prediction="prediction")
print(f"RMSE: {evaluator.rmse():.4f}")