Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataExpert io Data engineer handbook GetLocation UDF

From Leeroopedia


Overview

GetLocation_UDF is a Flink ScalarFunction that enriches streaming events by performing geolocation lookups on IP addresses. It calls the ip2location.io external API and returns a JSON string containing country, state, and city information.

Type

API Doc

Source

start_job.py:L58-79

Class Definition

class GetLocation(ScalarFunction):
    def eval(self, ip_address) -> str

Detailed Description

The GetLocation class extends Flink's ScalarFunction and implements a single eval method that performs geolocation enrichment.

Evaluation logic:

  1. Receives an ip_address string as input.
  2. Issues an HTTP GET request to the ip2location.io API, passing the IP address and an API key.
  3. Parses the JSON response from the API.
  4. Extracts the country_name, region_name (state), and city_name fields.
  5. Returns a JSON string containing the extracted geodata.
import requests
import json
from pyflink.table.udf import ScalarFunction, udf
from pyflink.common import DataTypes

class GetLocation(ScalarFunction):
    def eval(self, ip_address):
        url = f"https://api.ip2location.io/?key={os.environ['IP2LOCATION_API_KEY']}&ip={ip_address}"
        response = requests.get(url)
        if response.status_code != 200:
            return json.dumps({})
        data = response.json()
        return json.dumps({
            "country": data.get("country_name", ""),
            "state": data.get("region_name", ""),
            "city": data.get("city_name", "")
        })

UDF Registration

The UDF is registered with the Flink Table API using the udf helper:

get_location = udf(GetLocation(), result_type=DataTypes.STRING())

Once registered, get_location can be used in Table API expressions such as:

t_env.from_path("events").select(
    col("ip"),
    col("event_timestamp"),
    col("referrer"),
    col("host"),
    col("url"),
    get_location(col("ip"))
)

Inputs / Outputs

Inputs:

  • ip_address -- a VARCHAR containing the client IP address

Outputs:

  • A JSON string with geodata fields:
    • country -- the country name
    • state -- the region/state name
    • city -- the city name

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment