Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub SparkLineageConf Builder

From Leeroopedia


Metadata

Field Value
implementation_name SparkLineageConf Builder
type API Doc
status Active
last_updated 2026-02-10
source_file metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java
lines L13-56
repository datahub-project/datahub
domains Data_Lineage, Apache_Spark, Metadata_Management

Overview

SparkLineageConf is a Lombok-annotated configuration class that holds all lineage behavior settings for the DataHub Spark agent. It is constructed via the static toSparkLineageConf factory method, which translates a Typesafe Config object into a structured configuration used by the emitter and converter components.

Description

This class aggregates all configuration needed by the DataHub Spark lineage agent into a single immutable object. It combines three sources of configuration:

  1. Typesafe Config -- Parsed from spark.datahub.* properties
  2. SparkAppContext -- Runtime application metadata (name, ID, user, start time)
  3. DatahubEmitterConfig -- Connection-specific settings for the chosen emitter

The class uses Lombok's @Builder and @Getter annotations for construction and access, with @Builder.Default annotations providing sensible default values.

Source Code Reference

File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java

Class Declaration and Fields (L13-25)

@Getter
@Builder
@Setter
public class SparkLineageConf {
  final DatahubOpenlineageConfig openLineageConf;
  @Builder.Default final boolean coalesceEnabled = true;
  @Builder.Default final boolean emitCoalescePeriodically = false;
  @Builder.Default final boolean logMcps = true;
  final SparkAppContext sparkAppContext;
  final DatahubEmitterConfig datahubEmitterConfig;
  @Builder.Default final List<String> tags = new LinkedList<>();
  @Builder.Default final List<String> domains = new LinkedList<>();
}

Factory Method: toSparkLineageConf (L27-55)

public static SparkLineageConf toSparkLineageConf(
    Config sparkConfig,
    SparkAppContext sparkAppContext,
    DatahubEmitterConfig datahubEmitterConfig) {
  SparkLineageConfBuilder builder = SparkLineageConf.builder();
  DatahubOpenlineageConfig datahubOpenlineageConfig =
      SparkConfigParser.sparkConfigToDatahubOpenlineageConf(sparkConfig, sparkAppContext);
  builder.openLineageConf(datahubOpenlineageConfig);
  builder.coalesceEnabled(SparkConfigParser.isCoalesceEnabled(sparkConfig));
  builder.logMcps(SparkConfigParser.isLogMcps(sparkConfig));
  if (SparkConfigParser.getTags(sparkConfig) != null) {
    builder.tags(
        Arrays.asList(Objects.requireNonNull(SparkConfigParser.getTags(sparkConfig))));
  }
  if (SparkConfigParser.getDomains(sparkConfig) != null) {
    builder.domains(
        Arrays.asList(Objects.requireNonNull(SparkConfigParser.getDomains(sparkConfig))));
  }
  builder.emitCoalescePeriodically(
      SparkConfigParser.isEmitCoalescePeriodically(sparkConfig));
  if (sparkAppContext != null) {
    builder.sparkAppContext(sparkAppContext);
  }
  if (datahubEmitterConfig != null) {
    builder.datahubEmitterConfig = datahubEmitterConfig;
  }
  return builder.build();
}

I/O Contract

Aspect Details
Input Config (parsed Typesafe Config), SparkAppContext (application metadata), DatahubEmitterConfig (emitter settings)
Output SparkLineageConf instance containing all lineage configuration
Method Signature static SparkLineageConf toSparkLineageConf(Config, SparkAppContext, DatahubEmitterConfig)
Null Handling sparkAppContext and datahubEmitterConfig are nullable; defaults are used if null

Field Details

Field Type Default Description
openLineageConf DatahubOpenlineageConfig -- OpenLineage-specific configuration (fabric type, path specs, platform instance, schema metadata, column lineage, etc.)
coalesceEnabled boolean true Whether to coalesce multiple job events into one set of MCPs
emitCoalescePeriodically boolean false Whether to emit coalesced MCPs periodically during execution (true by default on Databricks)
logMcps boolean true Whether to log serialized MCPs for debugging
sparkAppContext SparkAppContext -- Application context (name, ID, user, start time, Databricks tags)
datahubEmitterConfig DatahubEmitterConfig -- Emitter-specific configuration (REST, Kafka, File, or S3)
tags List<String> empty list Tags to apply to the DataFlow entity
domains List<String> empty list Domain URNs to apply to the DataFlow entity

Configuration Composition

The SparkLineageConf composes configuration from multiple layers:

SparkLineageConf
  +-- openLineageConf: DatahubOpenlineageConfig
  |     +-- fabricType (PROD, DEV, etc.)
  |     +-- pathSpecs (platform -> path patterns)
  |     +-- platformInstance
  |     +-- commonDatasetPlatformInstance
  |     +-- hivePlatformAlias
  |     +-- includeSchemaMetadata
  |     +-- materializeDataset
  |     +-- captureColumnLevelLineage
  |     +-- usePatch
  |     +-- lowerCaseDatasetUrns
  |     +-- parentJobUrn
  |     +-- pipelineName
  +-- coalesceEnabled
  +-- emitCoalescePeriodically
  +-- logMcps
  +-- sparkAppContext: SparkAppContext
  |     +-- appName
  |     +-- appId
  |     +-- appAttemptId
  |     +-- sparkUser
  |     +-- startTime
  |     +-- databricksTags
  +-- datahubEmitterConfig: DatahubEmitterConfig
  |     (RestDatahubEmitterConfig | KafkaDatahubEmitterConfig
  |      | FileDatahubEmitterConfig | S3DatahubEmitterConfig)
  +-- tags
  +-- domains

Usage Examples

Programmatic Construction

Config config = SparkConfigParser.parseSparkConfig();
SparkAppContext appContext = new SparkAppContext();
appContext.setAppName("my_etl_pipeline");
appContext.setAppId("app-12345");

Optional<DatahubEmitterConfig> emitterConfig =
    listener.initializeEmitter(config);

SparkLineageConf lineageConf = SparkLineageConf.toSparkLineageConf(
    config,
    appContext,
    emitterConfig.orElse(null));

Corresponding Spark Configuration

# These spark.datahub.* properties map to SparkLineageConf fields:
--conf "spark.datahub.coalesce_jobs=true"                          # coalesceEnabled
--conf "spark.datahub.stage_metadata_coalescing=false"             # emitCoalescePeriodically
--conf "spark.datahub.log.mcps=true"                               # logMcps
--conf "spark.datahub.tags=etl,daily"                              # tags
--conf "spark.datahub.domains=urn:li:domain:analytics"             # domains
--conf "spark.datahub.metadata.dataset.env=PROD"                   # openLineageConf.fabricType
--conf "spark.datahub.metadata.dataset.include_schema_metadata=true" # openLineageConf.includeSchemaMetadata
--conf "spark.datahub.captureColumnLevelLineage=true"              # openLineageConf.captureColumnLevelLineage
--conf "spark.datahub.patch.enabled=false"                         # openLineageConf.usePatch

Knowledge Sources

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment