Implementation:Datahub project Datahub DatahubSparkListener Lifecycle
| Attribute | Value |
|---|---|
| Page Type | Implementation (API Doc) |
| Workflow | Spark_Lineage_Capture |
| Pair | 3 of 6 |
| Principle | Principle:Datahub_project_Datahub_Spark_Event_Interception |
| Repository | https://github.com/datahub-project/datahub |
| Source Location | metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java:L57-406 |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
DatahubSparkListener is the primary entry point for the DataHub Spark lineage agent. It extends Spark's native SparkListener class and acts as a bridge between the Spark execution runtime and DataHub's metadata emission pipeline. The listener intercepts Spark application and job lifecycle events, delegates to the OpenLineage Spark library for execution plan analysis, and coordinates with the DatahubEventEmitter for metadata delivery.
Internally, the listener maintains an OpenLineageSparkListener for plan-to-event conversion and a DatahubEventEmitter for event-to-MCP conversion and emission. Context initialization is deferred (lazy) until the first event arrives, at which point the Spark environment is fully available.
Usage
The listener is registered via Spark configuration and is automatically instantiated by the Spark runtime:
spark.extraListeners=datahub.spark.DatahubSparkListener
Once registered, no further user interaction is required. The listener receives callbacks from the Spark scheduler and processes them automatically.
Code Reference
Source Location
| File | metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java
|
| Lines | L57-406 |
| Module | acryl-spark-lineage
|
Signature
Class declaration:
public class DatahubSparkListener extends SparkListener
Constructor:
public DatahubSparkListener(SparkConf conf) throws URISyntaxException
Lifecycle callback methods:
public void onApplicationStart(SparkListenerApplicationStart applicationStart)
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd)
public void onJobStart(SparkListenerJobStart jobStart)
public void onJobEnd(SparkListenerJobEnd jobEnd)
public void onTaskEnd(SparkListenerTaskEnd taskEnd)
public void onOtherEvent(SparkListenerEvent event)
Configuration and emitter initialization:
public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf)
private synchronized SparkLineageConf loadDatahubConfig(
SparkAppContext appContext, Properties properties)
private void initializeContextFactoryIfNotInitialized()
Import
import datahub.spark.DatahubSparkListener;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | SparkConf |
Spark configuration object passed by the Spark runtime during listener construction. Contains all spark.datahub.* properties.
|
| Input | SparkListenerApplicationStart |
Application start event containing app name, app ID, attempt ID, Spark user, and start time. |
| Input | SparkListenerApplicationEnd |
Application end event signaling that the Spark application is shutting down. |
| Input | SparkListenerJobStart |
Job start event containing job ID, stage info, and properties. |
| Input | SparkListenerJobEnd |
Job end event containing job ID and job result (success/failure). |
| Input | SparkListenerTaskEnd |
Task end event containing task metrics and execution info. |
| Output | Side Effect | Delegates events to OpenLineageSparkListener for plan analysis. On application end (or periodically), triggers DatahubEventEmitter.emitCoalesced() to send accumulated lineage MCPs to DataHub.
|
Lifecycle flow:
1. Spark Runtime constructs DatahubSparkListener(SparkConf)
2. onApplicationStart() -> captures SparkAppContext, initializes ContextFactory
-> delegates to OpenLineageSparkListener.onApplicationStart()
3. onJobStart() -> ensures ContextFactory initialized
-> delegates to OpenLineageSparkListener.onJobStart()
4. onJobEnd() -> delegates to OpenLineageSparkListener.onJobEnd()
5. onTaskEnd() -> delegates to OpenLineageSparkListener.onTaskEnd()
6. onOtherEvent() -> delegates to OpenLineageSparkListener.onOtherEvent()
(handles SQL execution events, streaming micro-batch events)
7. onApplicationEnd() -> delegates to OpenLineageSparkListener.onApplicationEnd()
-> calls DatahubEventEmitter.emitCoalesced()
Usage Examples
Example 1: Standard registration via spark-submit
spark-submit \
--packages io.acryl:acryl-spark-lineage_2.12:0.2.18 \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.rest.server=http://localhost:8080" \
my_spark_job.py
The Spark runtime will:
- Instantiate
DatahubSparkListenerwith the currentSparkConf - Call
onApplicationStart()when the application begins - Call
onJobStart()/onJobEnd()for each Spark job - Call
onApplicationEnd()when the application finishes, triggering lineage emission
Example 2: Emitter type selection
The listener's initializeEmitter method selects the transport based on the spark.datahub.emitter config:
// REST emitter (default)
// spark.datahub.emitter=rest
// -> creates RestEmitter with GMS URL and token
// Kafka emitter
// spark.datahub.emitter=kafka
// -> creates KafkaEmitter with bootstrap server and schema registry
// File emitter
// spark.datahub.emitter=file
// -> creates FileEmitter with output file path
// S3 emitter
// spark.datahub.emitter=s3
// -> creates S3Emitter with bucket, region, and prefix
Related Pages
- Principle:Datahub_project_Datahub_Spark_Event_Interception
- Implementation:Datahub_project_Datahub_SparkConfigParser_Configuration
- Implementation:Datahub_project_Datahub_DatahubEventEmitter_Emit
- Implementation:Datahub_project_Datahub_OpenLineageToDataHub_ConvertRunEvent
- Environment:Datahub_project_Datahub_Java_Build