Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Snorkel team Snorkel Make Spark Mapper

From Leeroopedia
Knowledge Sources
Domains Data_Processing, Distributed_Computing
Last Updated 2026-02-14 20:38 GMT

Overview

Concrete tool for adapting Snorkel Mapper objects to work with PySpark's immutable Row objects.

Description

The make_spark_mapper function converts a standard Snorkel Mapper to be compatible with PySpark DataFrames. PySpark's Row objects are immutable, so the default Mapper field-update logic (which mutates data points in place) does not work. This function replaces the Mapper's internal _update_fields method with a version that reconstructs Row objects from scratch, merging original fields with newly mapped fields. This is a lightweight compatibility layer that enables Snorkel's mapper infrastructure to operate in distributed Spark environments.

Usage

Import this function when you need to use Snorkel Mappers (including preprocessors) with PySpark DataFrames. Call make_spark_mapper on any Mapper instance before passing it to a Spark-based applier or pipeline.

Code Reference

Source Location

Signature

def make_spark_mapper(mapper: Mapper) -> Mapper:
    """Convert ``Mapper`` to be compatible with PySpark.

    Parameters
    ----------
    mapper
        Mapper to make compatible with PySpark
    """

Import

from snorkel.map.spark import make_spark_mapper

I/O Contract

Inputs

Name Type Required Description
mapper Mapper Yes A Snorkel Mapper instance to adapt for PySpark compatibility

Outputs

Name Type Description
return Mapper The same Mapper instance with its _update_fields method replaced to handle immutable PySpark Row objects

Usage Examples

Adapting a Mapper for Spark

from snorkel.map.core import Mapper
from snorkel.map.spark import make_spark_mapper

# Define a custom mapper
@Mapper()
def add_length_field(x):
    x.text_length = len(x.text)
    return x

# Convert mapper for PySpark compatibility
spark_mapper = make_spark_mapper(add_length_field)

# Now spark_mapper can be used with PySpark DataFrames
# where Row objects are immutable

Using with SparkLFApplier

from snorkel.map.spark import make_spark_mapper
from snorkel.preprocess.core import Preprocessor

# Create a preprocessor (which is a Mapper subclass)
@Preprocessor()
def get_text_length(x):
    x.text_length = len(x.text)
    return x

# Adapt for Spark before using in a Spark-based pipeline
spark_preprocessor = make_spark_mapper(get_text_length)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment