Principle:DataExpert io Data engineer handbook Scalar UDF Enrichment
Overview
Scalar UDF Enrichment describes the theory and practice of using User-Defined Functions (UDFs) to enrich streaming data within Apache Flink. Scalar UDFs allow row-level transformations that can invoke external services, compute derived values, or apply custom business logic to each record in a stream.
ScalarFunction for Row-Level Transformations
In Flink, a ScalarFunction maps zero or more scalar values to a single new scalar value. It is evaluated once per input row. The PyFlink API allows defining scalar UDFs as Python classes that extend ScalarFunction:
from pyflink.table.udf import ScalarFunction, udf
from pyflink.common import DataTypes
class MyEnrichment(ScalarFunction):
def eval(self, input_value):
# transformation logic
return enriched_value
my_enrichment = udf(MyEnrichment(), result_type=DataTypes.STRING())
Key characteristics of scalar UDFs:
- One-to-one mapping -- each input row produces exactly one output value.
- Deterministic or non-deterministic -- UDFs that call external APIs are inherently non-deterministic.
- Registered in TableEnvironment -- once registered, the UDF can be referenced in SQL queries or Table API expressions.
External API Calls Within UDFs
A common enrichment pattern is to call an external service from within the UDF's eval method. For example, a geolocation lookup service can be called to enrich IP addresses with geographic data:
- The UDF receives an IP address as input.
- It issues an HTTP request to an external geolocation API.
- It returns structured geodata (country, state, city) as a JSON string.
Considerations for external API calls in UDFs:
- Latency -- external calls add per-record latency; consider caching or batching where possible.
- Rate limiting -- external APIs may impose rate limits; the UDF should handle retries or throttling.
- Fault tolerance -- network failures should be handled gracefully (e.g., returning a default value).
- Serialization -- the UDF must be serializable across Flink's distributed runtime.
When to Use
Use Scalar UDF Enrichment when:
- Enriching streaming events with data from external services (e.g., geolocation lookup, feature store queries).
- Applying custom row-level transformations that cannot be expressed in standard SQL.
- The enrichment logic is self-contained and produces a single output value per input row.