Implementation:DataExpert io Data engineer handbook GetLocation UDF
Appearance
Overview
GetLocation_UDF is a Flink ScalarFunction that enriches streaming events by performing geolocation lookups on IP addresses. It calls the ip2location.io external API and returns a JSON string containing country, state, and city information.
Type
API Doc
Source
start_job.py:L58-79
Class Definition
class GetLocation(ScalarFunction):
def eval(self, ip_address) -> str
Detailed Description
The GetLocation class extends Flink's ScalarFunction and implements a single eval method that performs geolocation enrichment.
Evaluation logic:
- Receives an
ip_addressstring as input. - Issues an HTTP GET request to the ip2location.io API, passing the IP address and an API key.
- Parses the JSON response from the API.
- Extracts the
country_name,region_name(state), andcity_namefields. - Returns a JSON string containing the extracted geodata.
import requests
import json
from pyflink.table.udf import ScalarFunction, udf
from pyflink.common import DataTypes
class GetLocation(ScalarFunction):
def eval(self, ip_address):
url = f"https://api.ip2location.io/?key={os.environ['IP2LOCATION_API_KEY']}&ip={ip_address}"
response = requests.get(url)
if response.status_code != 200:
return json.dumps({})
data = response.json()
return json.dumps({
"country": data.get("country_name", ""),
"state": data.get("region_name", ""),
"city": data.get("city_name", "")
})
UDF Registration
The UDF is registered with the Flink Table API using the udf helper:
get_location = udf(GetLocation(), result_type=DataTypes.STRING())
Once registered, get_location can be used in Table API expressions such as:
t_env.from_path("events").select(
col("ip"),
col("event_timestamp"),
col("referrer"),
col("host"),
col("url"),
get_location(col("ip"))
)
Inputs / Outputs
Inputs:
ip_address-- aVARCHARcontaining the client IP address
Outputs:
- A JSON string with geodata fields:
country-- the country namestate-- the region/state namecity-- the city name
Related Pages
- Principle:DataExpert_io_Data_engineer_handbook_Scalar_UDF_Enrichment
- Environment:DataExpert_io_Data_engineer_handbook_Flink_Kafka_Docker_Environment
Metadata
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment