Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub DatahubSparkListener Init

From Leeroopedia


Metadata

Field Value
implementation_name DatahubSparkListener Init
type API Doc
status Active
last_updated 2026-02-10
source_file metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java
lines L60-406
repository datahub-project/datahub
domains Data_Lineage, Apache_Spark, Metadata_Management

Overview

The DatahubSparkListener class implements the Spark listener interface for capturing job lifecycle events and converting them to DataHub metadata. It extends SparkListener and delegates to an internal OpenLineageSparkListener for event capture, then uses DatahubEventEmitter for metadata emission.

Description

This class is the main entry point for DataHub's Spark lineage integration. It is instantiated by Spark when registered via spark.extraListeners and receives all lifecycle callbacks from Spark's LiveListenerBus. The class coordinates between the OpenLineage Spark listener (for plan parsing), SparkConfigParser (for configuration), and DatahubEventEmitter (for metadata emission).

Source Code Reference

File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java

Class Declaration and Key Fields (L60-76)

public class DatahubSparkListener extends SparkListener {
  private static final Logger log = LoggerFactory.getLogger(DatahubSparkListener.class);
  private final Map<String, Instant> batchLastUpdated = new HashMap<String, Instant>();
  private final OpenLineageSparkListener listener;
  private DatahubEventEmitter emitter;
  private Config datahubConf = ConfigFactory.empty();
  private SparkAppContext appContext;
  private static ContextFactory contextFactory;
  private static CircuitBreaker circuitBreaker = new NoOpCircuitBreaker();
  private static final String sparkVersion = package$.MODULE$.SPARK_VERSION();
  private final SparkConf conf;
  private static MeterRegistry meterRegistry;
  private boolean isDisabled;
}

Constructor (L78-86)

public DatahubSparkListener(SparkConf conf) throws URISyntaxException {
    this.conf = ((SparkConf) Objects.requireNonNull(conf)).clone();
    listener = new OpenLineageSparkListener(conf);
    log.info(
        "Initializing DatahubSparkListener. Version: {} with Spark version: {}",
        VersionUtil.getVersion(),
        sparkVersion);
}

onApplicationStart (L101-110)

public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
    long startTime = System.currentTimeMillis();
    log.info("Application start called");
    this.appContext = getSparkAppContext(applicationStart);
    initializeContextFactoryIfNotInitialized();
    listener.onApplicationStart(applicationStart);
    long elapsedTime = System.currentTimeMillis() - startTime;
    log.info("onApplicationStart completed successfully in {} ms", elapsedTime);
}

onApplicationEnd (L289-306)

public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
    long startTime = System.currentTimeMillis();
    initializeContextFactoryIfNotInitialized();
    log.debug("Application end called");
    listener.onApplicationEnd(applicationEnd);
    if (datahubConf.hasPath(STREAMING_JOB) && (datahubConf.getBoolean(STREAMING_JOB))) {
        return;
    }
    if (emitter != null) {
        emitter.emitCoalesced();
    } else {
        log.warn("Emitter is not initialized, unable to emit coalesced events");
    }
    long elapsedTime = System.currentTimeMillis() - startTime;
    log.debug("onApplicationEnd completed successfully in {} ms", elapsedTime);
}

Context Factory Initialization (L366-405)

private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String appName) {
    if (contextFactory != null || isDisabled) {
        return;
    }
    try {
        SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null);
        SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
        initializeMetrics(config);
        emitter = new DatahubEventEmitter(config, appName);
        emitter.setConfig(datahubConfig);
        contextFactory = new ContextFactory(emitter, meterRegistry, config);
        circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
        OpenLineageSparkListener.init(contextFactory);
    } catch (URISyntaxException e) {
        log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", e);
    }
}

Emitter Initialization (L112-256)

The initializeEmitter method supports four emitter types:

public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
    String emitterType = sparkConf.hasPath(SparkConfigParser.EMITTER_TYPE)
            ? sparkConf.getString(SparkConfigParser.EMITTER_TYPE) : "rest";
    switch (emitterType) {
        case "rest":    // REST API emitter (default)
        case "kafka":   // Kafka emitter
        case "file":    // Local file emitter
        case "s3":      // S3 emitter
    }
}

I/O Contract

Aspect Details
Input SparkConf (constructor parameter), Spark lifecycle events (SparkListenerApplicationStart, SparkListenerJobStart, etc.)
Output DataHub Metadata Change Proposals (MCPs) emitted via configured emitter (REST, Kafka, File, or S3)
Configuration spark.extraListeners=datahub.spark.DatahubSparkListener
Side Effects Initializes ContextFactory, MeterRegistry, and CircuitBreaker on first event
Thread Safety loadDatahubConfig is synchronized; contextFactory and circuitBreaker are static

Usage Examples

Registering the Listener

spark-submit \
  --packages io.acryl:acryl-spark-lineage_2.12:0.13.1 \
  --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
  --conf "spark.datahub.rest.server=http://localhost:8080" \
  --conf "spark.datahub.rest.token=YOUR_TOKEN" \
  my_spark_app.py

Programmatic Registration (Scala)

val spark = SparkSession.builder()
  .appName("MyApp")
  .config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
  .config("spark.datahub.rest.server", "http://localhost:8080")
  .getOrCreate()

Key Callbacks

Method Line Description
DatahubSparkListener(SparkConf) L78 Constructor; clones SparkConf, creates internal OpenLineageSparkListener
onApplicationStart() L101 Captures app context, initializes context factory, forwards to OL listener
onApplicationEnd() L289 Forwards to OL listener, triggers coalesced emission (unless streaming)
onJobStart() L326 Initializes context factory if needed, forwards to OL listener
onJobEnd() L317 Forwards to OL listener
onTaskEnd() L308 Forwards to OL listener
onOtherEvent() L336 Forwards to OL listener (handles streaming progress events)
initializeEmitter(Config) L112 Creates emitter config for rest/kafka/file/s3

Knowledge Sources

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment