Implementation:Datahub project Datahub DatahubSparkListener Init
Metadata
| Field | Value |
|---|---|
| implementation_name | DatahubSparkListener Init |
| type | API Doc |
| status | Active |
| last_updated | 2026-02-10 |
| source_file | metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java
|
| lines | L60-406 |
| repository | datahub-project/datahub |
| domains | Data_Lineage, Apache_Spark, Metadata_Management |
Overview
The DatahubSparkListener class implements the Spark listener interface for capturing job lifecycle events and converting them to DataHub metadata. It extends SparkListener and delegates to an internal OpenLineageSparkListener for event capture, then uses DatahubEventEmitter for metadata emission.
Description
This class is the main entry point for DataHub's Spark lineage integration. It is instantiated by Spark when registered via spark.extraListeners and receives all lifecycle callbacks from Spark's LiveListenerBus. The class coordinates between the OpenLineage Spark listener (for plan parsing), SparkConfigParser (for configuration), and DatahubEventEmitter (for metadata emission).
Source Code Reference
File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java
Class Declaration and Key Fields (L60-76)
public class DatahubSparkListener extends SparkListener {
private static final Logger log = LoggerFactory.getLogger(DatahubSparkListener.class);
private final Map<String, Instant> batchLastUpdated = new HashMap<String, Instant>();
private final OpenLineageSparkListener listener;
private DatahubEventEmitter emitter;
private Config datahubConf = ConfigFactory.empty();
private SparkAppContext appContext;
private static ContextFactory contextFactory;
private static CircuitBreaker circuitBreaker = new NoOpCircuitBreaker();
private static final String sparkVersion = package$.MODULE$.SPARK_VERSION();
private final SparkConf conf;
private static MeterRegistry meterRegistry;
private boolean isDisabled;
}
Constructor (L78-86)
public DatahubSparkListener(SparkConf conf) throws URISyntaxException {
this.conf = ((SparkConf) Objects.requireNonNull(conf)).clone();
listener = new OpenLineageSparkListener(conf);
log.info(
"Initializing DatahubSparkListener. Version: {} with Spark version: {}",
VersionUtil.getVersion(),
sparkVersion);
}
onApplicationStart (L101-110)
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
long startTime = System.currentTimeMillis();
log.info("Application start called");
this.appContext = getSparkAppContext(applicationStart);
initializeContextFactoryIfNotInitialized();
listener.onApplicationStart(applicationStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("onApplicationStart completed successfully in {} ms", elapsedTime);
}
onApplicationEnd (L289-306)
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();
log.debug("Application end called");
listener.onApplicationEnd(applicationEnd);
if (datahubConf.hasPath(STREAMING_JOB) && (datahubConf.getBoolean(STREAMING_JOB))) {
return;
}
if (emitter != null) {
emitter.emitCoalesced();
} else {
log.warn("Emitter is not initialized, unable to emit coalesced events");
}
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onApplicationEnd completed successfully in {} ms", elapsedTime);
}
Context Factory Initialization (L366-405)
private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String appName) {
if (contextFactory != null || isDisabled) {
return;
}
try {
SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null);
SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
initializeMetrics(config);
emitter = new DatahubEventEmitter(config, appName);
emitter.setConfig(datahubConfig);
contextFactory = new ContextFactory(emitter, meterRegistry, config);
circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
OpenLineageSparkListener.init(contextFactory);
} catch (URISyntaxException e) {
log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", e);
}
}
Emitter Initialization (L112-256)
The initializeEmitter method supports four emitter types:
public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
String emitterType = sparkConf.hasPath(SparkConfigParser.EMITTER_TYPE)
? sparkConf.getString(SparkConfigParser.EMITTER_TYPE) : "rest";
switch (emitterType) {
case "rest": // REST API emitter (default)
case "kafka": // Kafka emitter
case "file": // Local file emitter
case "s3": // S3 emitter
}
}
I/O Contract
| Aspect | Details |
|---|---|
| Input | SparkConf (constructor parameter), Spark lifecycle events (SparkListenerApplicationStart, SparkListenerJobStart, etc.)
|
| Output | DataHub Metadata Change Proposals (MCPs) emitted via configured emitter (REST, Kafka, File, or S3) |
| Configuration | spark.extraListeners=datahub.spark.DatahubSparkListener
|
| Side Effects | Initializes ContextFactory, MeterRegistry, and CircuitBreaker on first event
|
| Thread Safety | loadDatahubConfig is synchronized; contextFactory and circuitBreaker are static
|
Usage Examples
Registering the Listener
spark-submit \
--packages io.acryl:acryl-spark-lineage_2.12:0.13.1 \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.rest.server=http://localhost:8080" \
--conf "spark.datahub.rest.token=YOUR_TOKEN" \
my_spark_app.py
Programmatic Registration (Scala)
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.getOrCreate()
Key Callbacks
| Method | Line | Description |
|---|---|---|
DatahubSparkListener(SparkConf) |
L78 | Constructor; clones SparkConf, creates internal OpenLineageSparkListener |
onApplicationStart() |
L101 | Captures app context, initializes context factory, forwards to OL listener |
onApplicationEnd() |
L289 | Forwards to OL listener, triggers coalesced emission (unless streaming) |
onJobStart() |
L326 | Initializes context factory if needed, forwards to OL listener |
onJobEnd() |
L317 | Forwards to OL listener |
onTaskEnd() |
L308 | Forwards to OL listener |
onOtherEvent() |
L336 | Forwards to OL listener (handles streaming progress events) |
initializeEmitter(Config) |
L112 | Creates emitter config for rest/kafka/file/s3 |
Knowledge Sources
Related
- Implements: Datahub_project_Datahub_Spark_Listener_Registration
- Related to: Datahub_project_Datahub_Spark_Lineage_JAR_Dependency
- Related to: Datahub_project_Datahub_DatahubEventEmitter_Emit
- Environment: Environment:Datahub_project_Datahub_Spark_Lineage_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Spark_Databricks_Coalescing