Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Recommenders team Recommenders Spark Evaluation Classes

From Leeroopedia


Knowledge Sources
Domains Model Evaluation, Recommendation Systems, Distributed Computing
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for computing rating accuracy and ranking quality metrics on Spark DataFrames using the SparkRatingEvaluation and SparkRankingEvaluation classes.

Description

This implementation provides two evaluation classes that wrap PySpark's RegressionMetrics and RankingMetrics with a recommender-system-specific interface. Both classes accept true and predicted rating DataFrames, validate schemas, join the data on user-item pairs, and expose metric methods that compute results distributedly.

SparkRatingEvaluation (Lines 32-166) handles regression-style rating metrics. On initialization, it inner-joins true and predicted DataFrames on user and item columns, casts ratings to doubles, and creates a RegressionMetrics object from the (prediction, label) RDD. Methods .rmse(), .mae(), .rsquared(), and .exp_var() delegate to PySpark's built-in implementations, except for .exp_var() which uses a custom variance-based formula to work around a bug in Spark MLlib's native explained variance.

SparkRankingEvaluation (Lines 169-358) handles ranking metrics. On initialization, it filters predicted ratings to the top-K items per user (or by threshold/timestamp), collects predicted and true item lists per user, and creates a RankingMetrics object. Methods .precision_at_k(), .recall_at_k(), .ndcg_at_k(), .map(), and .map_at_k() delegate to PySpark's RankingMetrics.

Usage

Instantiate one or both classes after generating predictions from a trained model. Pass the ground truth test DataFrame and the predictions DataFrame. Call the metric methods to retrieve individual metric values. Both classes validate that required columns exist and that DataFrames are non-empty, raising descriptive errors on schema mismatches.

Code Reference

Source Location

  • Repository: recommenders
  • File: recommenders/evaluation/spark_evaluation.py (Lines 32-358)

Signature

# Rating Evaluation
class SparkRatingEvaluation:
    def __init__(
        self,
        rating_true,        # pyspark.sql.DataFrame
        rating_pred,        # pyspark.sql.DataFrame
        col_user="userID",
        col_item="itemID",
        col_rating="rating",
        col_prediction="prediction",
    ):
        ...

    def rmse(self) -> float: ...
    def mae(self) -> float: ...
    def rsquared(self) -> float: ...
    def exp_var(self) -> float: ...


# Ranking Evaluation
class SparkRankingEvaluation:
    def __init__(
        self,
        rating_true,        # pyspark.sql.DataFrame
        rating_pred,        # pyspark.sql.DataFrame
        k=10,
        relevancy_method="top_k",
        col_user="userID",
        col_item="itemID",
        col_rating="rating",
        col_prediction="prediction",
        threshold=10,
    ):
        ...

    def precision_at_k(self) -> float: ...
    def recall_at_k(self) -> float: ...
    def ndcg_at_k(self) -> float: ...
    def map(self) -> float: ...
    def map_at_k(self) -> float: ...

Import

from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation

I/O Contract

Inputs (SparkRatingEvaluation)

Name Type Required Description
rating_true pyspark.sql.DataFrame Yes Ground truth DataFrame containing user, item, and rating columns
rating_pred pyspark.sql.DataFrame Yes Predictions DataFrame containing user, item, and prediction columns
col_user str No (default: "userID") Name of the user column
col_item str No (default: "itemID") Name of the item column
col_rating str No (default: "rating") Name of the true rating column in rating_true
col_prediction str No (default: "prediction") Name of the predicted rating column in rating_pred

Inputs (SparkRankingEvaluation)

Name Type Required Description
rating_true pyspark.sql.DataFrame Yes Ground truth DataFrame containing user, item, and rating columns
rating_pred pyspark.sql.DataFrame Yes Predictions DataFrame containing user, item, and prediction columns
k int No (default: 10) Number of top items to consider for ranking metrics
relevancy_method str No (default: "top_k") Method for determining relevant items; one of "top_k", "by_time_stamp", "by_threshold"
col_user str No (default: "userID") Name of the user column
col_item str No (default: "itemID") Name of the item column
col_rating str No (default: "rating") Name of the true rating column
col_prediction str No (default: "prediction") Name of the predicted rating column
threshold float No (default: 10) Rating threshold for relevancy when relevancy_method="by_threshold"

Outputs

Method Return Type Description
.rmse() float Root Mean Squared Error (lower is better)
.mae() float Mean Absolute Error (lower is better)
.rsquared() float R-squared coefficient of determination (higher is better, max 1.0)
.exp_var() float Explained variance ratio (higher is better, range 0.0 to 1.0)
.precision_at_k() float Precision at K (higher is better, range 0.0 to 1.0)
.recall_at_k() float Recall at K (higher is better, range 0.0 to 1.0)
.ndcg_at_k() float Normalized Discounted Cumulative Gain at K (higher is better, range 0.0 to 1.0)
.map() float Mean Average Precision (higher is better, range 0.0 to 1.0)
.map_at_k() float Mean Average Precision at K (higher is better, range 0.0 to 1.0)

Usage Examples

Rating Metrics

from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation

# Assume test and predictions DataFrames are already prepared
rating_eval = SparkRatingEvaluation(
    rating_true=test,
    rating_pred=predictions,
    col_user="userID",
    col_item="itemID",
    col_rating="rating",
    col_prediction="prediction",
)

print(f"RMSE:     {rating_eval.rmse():.4f}")
print(f"MAE:      {rating_eval.mae():.4f}")
print(f"R^2:      {rating_eval.rsquared():.4f}")
print(f"Exp Var:  {rating_eval.exp_var():.4f}")
# RMSE:     0.9672
# MAE:      0.7530
# R^2:      0.2641
# Exp Var:  0.2674

Ranking Metrics

from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation

ranking_eval = SparkRankingEvaluation(
    rating_true=test,
    rating_pred=predictions,
    k=10,
    relevancy_method="top_k",
    col_user="userID",
    col_item="itemID",
    col_rating="rating",
    col_prediction="prediction",
)

print(f"Precision@10: {ranking_eval.precision_at_k():.4f}")
print(f"Recall@10:    {ranking_eval.recall_at_k():.4f}")
print(f"NDCG@10:      {ranking_eval.ndcg_at_k():.4f}")
print(f"MAP:          {ranking_eval.map():.4f}")
print(f"MAP@10:       {ranking_eval.map_at_k():.4f}")
# Precision@10: 0.0451
# Recall@10:    0.0178
# NDCG@10:      0.0463
# MAP:          0.0052
# MAP@10:       0.0037

Full ALS Evaluation Workflow

from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.datasets.movielens import load_spark_df
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import (
    SparkRatingEvaluation,
    SparkRankingEvaluation,
)
from pyspark.ml.recommendation import ALS

# Setup and data
spark = start_or_get_spark(app_name="ALS_Eval", memory="16g")
data = load_spark_df(spark, size="100k")
train, test = spark_random_split(data, ratio=0.75, seed=42)

# Train model and predict
als = ALS(rank=10, maxIter=15, regParam=0.05,
          userCol="userID", itemCol="itemID", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(train)
predictions = model.transform(test)

# Evaluate
rating_eval = SparkRatingEvaluation(test, predictions)
ranking_eval = SparkRankingEvaluation(test, predictions, k=10)

print(f"RMSE:         {rating_eval.rmse():.4f}")
print(f"MAE:          {rating_eval.mae():.4f}")
print(f"Precision@10: {ranking_eval.precision_at_k():.4f}")
print(f"NDCG@10:      {ranking_eval.ndcg_at_k():.4f}")

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment