Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:DataExpert io Data engineer handbook Scalar UDF Enrichment

From Leeroopedia


Overview

Scalar UDF Enrichment describes the theory and practice of using User-Defined Functions (UDFs) to enrich streaming data within Apache Flink. Scalar UDFs allow row-level transformations that can invoke external services, compute derived values, or apply custom business logic to each record in a stream.

ScalarFunction for Row-Level Transformations

In Flink, a ScalarFunction maps zero or more scalar values to a single new scalar value. It is evaluated once per input row. The PyFlink API allows defining scalar UDFs as Python classes that extend ScalarFunction:

from pyflink.table.udf import ScalarFunction, udf
from pyflink.common import DataTypes

class MyEnrichment(ScalarFunction):
    def eval(self, input_value):
        # transformation logic
        return enriched_value

my_enrichment = udf(MyEnrichment(), result_type=DataTypes.STRING())

Key characteristics of scalar UDFs:

  • One-to-one mapping -- each input row produces exactly one output value.
  • Deterministic or non-deterministic -- UDFs that call external APIs are inherently non-deterministic.
  • Registered in TableEnvironment -- once registered, the UDF can be referenced in SQL queries or Table API expressions.

External API Calls Within UDFs

A common enrichment pattern is to call an external service from within the UDF's eval method. For example, a geolocation lookup service can be called to enrich IP addresses with geographic data:

  • The UDF receives an IP address as input.
  • It issues an HTTP request to an external geolocation API.
  • It returns structured geodata (country, state, city) as a JSON string.

Considerations for external API calls in UDFs:

  • Latency -- external calls add per-record latency; consider caching or batching where possible.
  • Rate limiting -- external APIs may impose rate limits; the UDF should handle retries or throttling.
  • Fault tolerance -- network failures should be handled gracefully (e.g., returning a default value).
  • Serialization -- the UDF must be serializable across Flink's distributed runtime.

When to Use

Use Scalar UDF Enrichment when:

  • Enriching streaming events with data from external services (e.g., geolocation lookup, feature store queries).
  • Applying custom row-level transformations that cannot be expressed in standard SQL.
  • The enrichment logic is self-contained and produces a single output value per input row.

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment