Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub DatahubSparkListener Lifecycle

From Leeroopedia


Attribute Value
Page Type Implementation (API Doc)
Workflow Spark_Lineage_Capture
Pair 3 of 6
Principle Principle:Datahub_project_Datahub_Spark_Event_Interception
Repository https://github.com/datahub-project/datahub
Source Location metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java:L57-406
Last Updated 2026-02-09 17:00 GMT

Overview

Description

DatahubSparkListener is the primary entry point for the DataHub Spark lineage agent. It extends Spark's native SparkListener class and acts as a bridge between the Spark execution runtime and DataHub's metadata emission pipeline. The listener intercepts Spark application and job lifecycle events, delegates to the OpenLineage Spark library for execution plan analysis, and coordinates with the DatahubEventEmitter for metadata delivery.

Internally, the listener maintains an OpenLineageSparkListener for plan-to-event conversion and a DatahubEventEmitter for event-to-MCP conversion and emission. Context initialization is deferred (lazy) until the first event arrives, at which point the Spark environment is fully available.

Usage

The listener is registered via Spark configuration and is automatically instantiated by the Spark runtime:

spark.extraListeners=datahub.spark.DatahubSparkListener

Once registered, no further user interaction is required. The listener receives callbacks from the Spark scheduler and processes them automatically.

Code Reference

Source Location

File metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java
Lines L57-406
Module acryl-spark-lineage

Signature

Class declaration:

public class DatahubSparkListener extends SparkListener

Constructor:

public DatahubSparkListener(SparkConf conf) throws URISyntaxException

Lifecycle callback methods:

public void onApplicationStart(SparkListenerApplicationStart applicationStart)
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd)
public void onJobStart(SparkListenerJobStart jobStart)
public void onJobEnd(SparkListenerJobEnd jobEnd)
public void onTaskEnd(SparkListenerTaskEnd taskEnd)
public void onOtherEvent(SparkListenerEvent event)

Configuration and emitter initialization:

public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf)
private synchronized SparkLineageConf loadDatahubConfig(
    SparkAppContext appContext, Properties properties)
private void initializeContextFactoryIfNotInitialized()

Import

import datahub.spark.DatahubSparkListener;

I/O Contract

Direction Type Description
Input SparkConf Spark configuration object passed by the Spark runtime during listener construction. Contains all spark.datahub.* properties.
Input SparkListenerApplicationStart Application start event containing app name, app ID, attempt ID, Spark user, and start time.
Input SparkListenerApplicationEnd Application end event signaling that the Spark application is shutting down.
Input SparkListenerJobStart Job start event containing job ID, stage info, and properties.
Input SparkListenerJobEnd Job end event containing job ID and job result (success/failure).
Input SparkListenerTaskEnd Task end event containing task metrics and execution info.
Output Side Effect Delegates events to OpenLineageSparkListener for plan analysis. On application end (or periodically), triggers DatahubEventEmitter.emitCoalesced() to send accumulated lineage MCPs to DataHub.

Lifecycle flow:

1. Spark Runtime constructs DatahubSparkListener(SparkConf)
2. onApplicationStart() -> captures SparkAppContext, initializes ContextFactory
   -> delegates to OpenLineageSparkListener.onApplicationStart()
3. onJobStart() -> ensures ContextFactory initialized
   -> delegates to OpenLineageSparkListener.onJobStart()
4. onJobEnd() -> delegates to OpenLineageSparkListener.onJobEnd()
5. onTaskEnd() -> delegates to OpenLineageSparkListener.onTaskEnd()
6. onOtherEvent() -> delegates to OpenLineageSparkListener.onOtherEvent()
   (handles SQL execution events, streaming micro-batch events)
7. onApplicationEnd() -> delegates to OpenLineageSparkListener.onApplicationEnd()
   -> calls DatahubEventEmitter.emitCoalesced()

Usage Examples

Example 1: Standard registration via spark-submit

spark-submit \
  --packages io.acryl:acryl-spark-lineage_2.12:0.2.18 \
  --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
  --conf "spark.datahub.rest.server=http://localhost:8080" \
  my_spark_job.py

The Spark runtime will:

  1. Instantiate DatahubSparkListener with the current SparkConf
  2. Call onApplicationStart() when the application begins
  3. Call onJobStart() / onJobEnd() for each Spark job
  4. Call onApplicationEnd() when the application finishes, triggering lineage emission

Example 2: Emitter type selection

The listener's initializeEmitter method selects the transport based on the spark.datahub.emitter config:

// REST emitter (default)
// spark.datahub.emitter=rest
// -> creates RestEmitter with GMS URL and token

// Kafka emitter
// spark.datahub.emitter=kafka
// -> creates KafkaEmitter with bootstrap server and schema registry

// File emitter
// spark.datahub.emitter=file
// -> creates FileEmitter with output file path

// S3 emitter
// spark.datahub.emitter=s3
// -> creates S3Emitter with bucket, region, and prefix

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment