Implementation:Recommenders team Recommenders Spark Evaluation Classes
| Knowledge Sources | |
|---|---|
| Domains | Model Evaluation, Recommendation Systems, Distributed Computing |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for computing rating accuracy and ranking quality metrics on Spark DataFrames using the SparkRatingEvaluation and SparkRankingEvaluation classes.
Description
This implementation provides two evaluation classes that wrap PySpark's RegressionMetrics and RankingMetrics with a recommender-system-specific interface. Both classes accept true and predicted rating DataFrames, validate schemas, join the data on user-item pairs, and expose metric methods that compute results distributedly.
SparkRatingEvaluation (Lines 32-166) handles regression-style rating metrics. On initialization, it inner-joins true and predicted DataFrames on user and item columns, casts ratings to doubles, and creates a RegressionMetrics object from the (prediction, label) RDD. Methods .rmse(), .mae(), .rsquared(), and .exp_var() delegate to PySpark's built-in implementations, except for .exp_var() which uses a custom variance-based formula to work around a bug in Spark MLlib's native explained variance.
SparkRankingEvaluation (Lines 169-358) handles ranking metrics. On initialization, it filters predicted ratings to the top-K items per user (or by threshold/timestamp), collects predicted and true item lists per user, and creates a RankingMetrics object. Methods .precision_at_k(), .recall_at_k(), .ndcg_at_k(), .map(), and .map_at_k() delegate to PySpark's RankingMetrics.
Usage
Instantiate one or both classes after generating predictions from a trained model. Pass the ground truth test DataFrame and the predictions DataFrame. Call the metric methods to retrieve individual metric values. Both classes validate that required columns exist and that DataFrames are non-empty, raising descriptive errors on schema mismatches.
Code Reference
Source Location
- Repository: recommenders
- File: recommenders/evaluation/spark_evaluation.py (Lines 32-358)
Signature
# Rating Evaluation
class SparkRatingEvaluation:
def __init__(
self,
rating_true, # pyspark.sql.DataFrame
rating_pred, # pyspark.sql.DataFrame
col_user="userID",
col_item="itemID",
col_rating="rating",
col_prediction="prediction",
):
...
def rmse(self) -> float: ...
def mae(self) -> float: ...
def rsquared(self) -> float: ...
def exp_var(self) -> float: ...
# Ranking Evaluation
class SparkRankingEvaluation:
def __init__(
self,
rating_true, # pyspark.sql.DataFrame
rating_pred, # pyspark.sql.DataFrame
k=10,
relevancy_method="top_k",
col_user="userID",
col_item="itemID",
col_rating="rating",
col_prediction="prediction",
threshold=10,
):
...
def precision_at_k(self) -> float: ...
def recall_at_k(self) -> float: ...
def ndcg_at_k(self) -> float: ...
def map(self) -> float: ...
def map_at_k(self) -> float: ...
Import
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
I/O Contract
Inputs (SparkRatingEvaluation)
| Name | Type | Required | Description |
|---|---|---|---|
| rating_true | pyspark.sql.DataFrame | Yes | Ground truth DataFrame containing user, item, and rating columns |
| rating_pred | pyspark.sql.DataFrame | Yes | Predictions DataFrame containing user, item, and prediction columns |
| col_user | str | No (default: "userID") | Name of the user column |
| col_item | str | No (default: "itemID") | Name of the item column |
| col_rating | str | No (default: "rating") | Name of the true rating column in rating_true
|
| col_prediction | str | No (default: "prediction") | Name of the predicted rating column in rating_pred
|
Inputs (SparkRankingEvaluation)
| Name | Type | Required | Description |
|---|---|---|---|
| rating_true | pyspark.sql.DataFrame | Yes | Ground truth DataFrame containing user, item, and rating columns |
| rating_pred | pyspark.sql.DataFrame | Yes | Predictions DataFrame containing user, item, and prediction columns |
| k | int | No (default: 10) | Number of top items to consider for ranking metrics |
| relevancy_method | str | No (default: "top_k") | Method for determining relevant items; one of "top_k", "by_time_stamp", "by_threshold"
|
| col_user | str | No (default: "userID") | Name of the user column |
| col_item | str | No (default: "itemID") | Name of the item column |
| col_rating | str | No (default: "rating") | Name of the true rating column |
| col_prediction | str | No (default: "prediction") | Name of the predicted rating column |
| threshold | float | No (default: 10) | Rating threshold for relevancy when relevancy_method="by_threshold"
|
Outputs
| Method | Return Type | Description |
|---|---|---|
| .rmse() | float | Root Mean Squared Error (lower is better) |
| .mae() | float | Mean Absolute Error (lower is better) |
| .rsquared() | float | R-squared coefficient of determination (higher is better, max 1.0) |
| .exp_var() | float | Explained variance ratio (higher is better, range 0.0 to 1.0) |
| .precision_at_k() | float | Precision at K (higher is better, range 0.0 to 1.0) |
| .recall_at_k() | float | Recall at K (higher is better, range 0.0 to 1.0) |
| .ndcg_at_k() | float | Normalized Discounted Cumulative Gain at K (higher is better, range 0.0 to 1.0) |
| .map() | float | Mean Average Precision (higher is better, range 0.0 to 1.0) |
| .map_at_k() | float | Mean Average Precision at K (higher is better, range 0.0 to 1.0) |
Usage Examples
Rating Metrics
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation
# Assume test and predictions DataFrames are already prepared
rating_eval = SparkRatingEvaluation(
rating_true=test,
rating_pred=predictions,
col_user="userID",
col_item="itemID",
col_rating="rating",
col_prediction="prediction",
)
print(f"RMSE: {rating_eval.rmse():.4f}")
print(f"MAE: {rating_eval.mae():.4f}")
print(f"R^2: {rating_eval.rsquared():.4f}")
print(f"Exp Var: {rating_eval.exp_var():.4f}")
# RMSE: 0.9672
# MAE: 0.7530
# R^2: 0.2641
# Exp Var: 0.2674
Ranking Metrics
from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation
ranking_eval = SparkRankingEvaluation(
rating_true=test,
rating_pred=predictions,
k=10,
relevancy_method="top_k",
col_user="userID",
col_item="itemID",
col_rating="rating",
col_prediction="prediction",
)
print(f"Precision@10: {ranking_eval.precision_at_k():.4f}")
print(f"Recall@10: {ranking_eval.recall_at_k():.4f}")
print(f"NDCG@10: {ranking_eval.ndcg_at_k():.4f}")
print(f"MAP: {ranking_eval.map():.4f}")
print(f"MAP@10: {ranking_eval.map_at_k():.4f}")
# Precision@10: 0.0451
# Recall@10: 0.0178
# NDCG@10: 0.0463
# MAP: 0.0052
# MAP@10: 0.0037
Full ALS Evaluation Workflow
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.datasets.movielens import load_spark_df
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import (
SparkRatingEvaluation,
SparkRankingEvaluation,
)
from pyspark.ml.recommendation import ALS
# Setup and data
spark = start_or_get_spark(app_name="ALS_Eval", memory="16g")
data = load_spark_df(spark, size="100k")
train, test = spark_random_split(data, ratio=0.75, seed=42)
# Train model and predict
als = ALS(rank=10, maxIter=15, regParam=0.05,
userCol="userID", itemCol="itemID", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(train)
predictions = model.transform(test)
# Evaluate
rating_eval = SparkRatingEvaluation(test, predictions)
ranking_eval = SparkRankingEvaluation(test, predictions, k=10)
print(f"RMSE: {rating_eval.rmse():.4f}")
print(f"MAE: {rating_eval.mae():.4f}")
print(f"Precision@10: {ranking_eval.precision_at_k():.4f}")
print(f"NDCG@10: {ranking_eval.ndcg_at_k():.4f}")