Implementation:Snorkel team Snorkel Make Spark Mapper
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Distributed_Computing |
| Last Updated | 2026-02-14 20:38 GMT |
Overview
Concrete tool for adapting Snorkel Mapper objects to work with PySpark's immutable Row objects.
Description
The make_spark_mapper function converts a standard Snorkel Mapper to be compatible with PySpark DataFrames. PySpark's Row objects are immutable, so the default Mapper field-update logic (which mutates data points in place) does not work. This function replaces the Mapper's internal _update_fields method with a version that reconstructs Row objects from scratch, merging original fields with newly mapped fields. This is a lightweight compatibility layer that enables Snorkel's mapper infrastructure to operate in distributed Spark environments.
Usage
Import this function when you need to use Snorkel Mappers (including preprocessors) with PySpark DataFrames. Call make_spark_mapper on any Mapper instance before passing it to a Spark-based applier or pipeline.
Code Reference
Source Location
- Repository: Snorkel
- File: snorkel/map/spark.py
- Lines: 1-26
Signature
def make_spark_mapper(mapper: Mapper) -> Mapper:
"""Convert ``Mapper`` to be compatible with PySpark.
Parameters
----------
mapper
Mapper to make compatible with PySpark
"""
Import
from snorkel.map.spark import make_spark_mapper
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapper | Mapper | Yes | A Snorkel Mapper instance to adapt for PySpark compatibility |
Outputs
| Name | Type | Description |
|---|---|---|
| return | Mapper | The same Mapper instance with its _update_fields method replaced to handle immutable PySpark Row objects
|
Usage Examples
Adapting a Mapper for Spark
from snorkel.map.core import Mapper
from snorkel.map.spark import make_spark_mapper
# Define a custom mapper
@Mapper()
def add_length_field(x):
x.text_length = len(x.text)
return x
# Convert mapper for PySpark compatibility
spark_mapper = make_spark_mapper(add_length_field)
# Now spark_mapper can be used with PySpark DataFrames
# where Row objects are immutable
Using with SparkLFApplier
from snorkel.map.spark import make_spark_mapper
from snorkel.preprocess.core import Preprocessor
# Create a preprocessor (which is a Mapper subclass)
@Preprocessor()
def get_text_length(x):
x.text_length = len(x.text)
return x
# Adapt for Spark before using in a Spark-based pipeline
spark_preprocessor = make_spark_mapper(get_text_length)