Implementation:Datahub project Datahub SparkLineageConf Builder
Metadata
| Field | Value |
|---|---|
| implementation_name | SparkLineageConf Builder |
| type | API Doc |
| status | Active |
| last_updated | 2026-02-10 |
| source_file | metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java
|
| lines | L13-56 |
| repository | datahub-project/datahub |
| domains | Data_Lineage, Apache_Spark, Metadata_Management |
Overview
SparkLineageConf is a Lombok-annotated configuration class that holds all lineage behavior settings for the DataHub Spark agent. It is constructed via the static toSparkLineageConf factory method, which translates a Typesafe Config object into a structured configuration used by the emitter and converter components.
Description
This class aggregates all configuration needed by the DataHub Spark lineage agent into a single immutable object. It combines three sources of configuration:
- Typesafe Config -- Parsed from
spark.datahub.*properties - SparkAppContext -- Runtime application metadata (name, ID, user, start time)
- DatahubEmitterConfig -- Connection-specific settings for the chosen emitter
The class uses Lombok's @Builder and @Getter annotations for construction and access, with @Builder.Default annotations providing sensible default values.
Source Code Reference
File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java
Class Declaration and Fields (L13-25)
@Getter
@Builder
@Setter
public class SparkLineageConf {
final DatahubOpenlineageConfig openLineageConf;
@Builder.Default final boolean coalesceEnabled = true;
@Builder.Default final boolean emitCoalescePeriodically = false;
@Builder.Default final boolean logMcps = true;
final SparkAppContext sparkAppContext;
final DatahubEmitterConfig datahubEmitterConfig;
@Builder.Default final List<String> tags = new LinkedList<>();
@Builder.Default final List<String> domains = new LinkedList<>();
}
Factory Method: toSparkLineageConf (L27-55)
public static SparkLineageConf toSparkLineageConf(
Config sparkConfig,
SparkAppContext sparkAppContext,
DatahubEmitterConfig datahubEmitterConfig) {
SparkLineageConfBuilder builder = SparkLineageConf.builder();
DatahubOpenlineageConfig datahubOpenlineageConfig =
SparkConfigParser.sparkConfigToDatahubOpenlineageConf(sparkConfig, sparkAppContext);
builder.openLineageConf(datahubOpenlineageConfig);
builder.coalesceEnabled(SparkConfigParser.isCoalesceEnabled(sparkConfig));
builder.logMcps(SparkConfigParser.isLogMcps(sparkConfig));
if (SparkConfigParser.getTags(sparkConfig) != null) {
builder.tags(
Arrays.asList(Objects.requireNonNull(SparkConfigParser.getTags(sparkConfig))));
}
if (SparkConfigParser.getDomains(sparkConfig) != null) {
builder.domains(
Arrays.asList(Objects.requireNonNull(SparkConfigParser.getDomains(sparkConfig))));
}
builder.emitCoalescePeriodically(
SparkConfigParser.isEmitCoalescePeriodically(sparkConfig));
if (sparkAppContext != null) {
builder.sparkAppContext(sparkAppContext);
}
if (datahubEmitterConfig != null) {
builder.datahubEmitterConfig = datahubEmitterConfig;
}
return builder.build();
}
I/O Contract
| Aspect | Details |
|---|---|
| Input | Config (parsed Typesafe Config), SparkAppContext (application metadata), DatahubEmitterConfig (emitter settings)
|
| Output | SparkLineageConf instance containing all lineage configuration
|
| Method Signature | static SparkLineageConf toSparkLineageConf(Config, SparkAppContext, DatahubEmitterConfig)
|
| Null Handling | sparkAppContext and datahubEmitterConfig are nullable; defaults are used if null
|
Field Details
| Field | Type | Default | Description |
|---|---|---|---|
openLineageConf |
DatahubOpenlineageConfig |
-- | OpenLineage-specific configuration (fabric type, path specs, platform instance, schema metadata, column lineage, etc.) |
coalesceEnabled |
boolean |
true |
Whether to coalesce multiple job events into one set of MCPs |
emitCoalescePeriodically |
boolean |
false |
Whether to emit coalesced MCPs periodically during execution (true by default on Databricks) |
logMcps |
boolean |
true |
Whether to log serialized MCPs for debugging |
sparkAppContext |
SparkAppContext |
-- | Application context (name, ID, user, start time, Databricks tags) |
datahubEmitterConfig |
DatahubEmitterConfig |
-- | Emitter-specific configuration (REST, Kafka, File, or S3) |
tags |
List<String> |
empty list | Tags to apply to the DataFlow entity |
domains |
List<String> |
empty list | Domain URNs to apply to the DataFlow entity |
Configuration Composition
The SparkLineageConf composes configuration from multiple layers:
SparkLineageConf
+-- openLineageConf: DatahubOpenlineageConfig
| +-- fabricType (PROD, DEV, etc.)
| +-- pathSpecs (platform -> path patterns)
| +-- platformInstance
| +-- commonDatasetPlatformInstance
| +-- hivePlatformAlias
| +-- includeSchemaMetadata
| +-- materializeDataset
| +-- captureColumnLevelLineage
| +-- usePatch
| +-- lowerCaseDatasetUrns
| +-- parentJobUrn
| +-- pipelineName
+-- coalesceEnabled
+-- emitCoalescePeriodically
+-- logMcps
+-- sparkAppContext: SparkAppContext
| +-- appName
| +-- appId
| +-- appAttemptId
| +-- sparkUser
| +-- startTime
| +-- databricksTags
+-- datahubEmitterConfig: DatahubEmitterConfig
| (RestDatahubEmitterConfig | KafkaDatahubEmitterConfig
| | FileDatahubEmitterConfig | S3DatahubEmitterConfig)
+-- tags
+-- domains
Usage Examples
Programmatic Construction
Config config = SparkConfigParser.parseSparkConfig();
SparkAppContext appContext = new SparkAppContext();
appContext.setAppName("my_etl_pipeline");
appContext.setAppId("app-12345");
Optional<DatahubEmitterConfig> emitterConfig =
listener.initializeEmitter(config);
SparkLineageConf lineageConf = SparkLineageConf.toSparkLineageConf(
config,
appContext,
emitterConfig.orElse(null));
Corresponding Spark Configuration
# These spark.datahub.* properties map to SparkLineageConf fields:
--conf "spark.datahub.coalesce_jobs=true" # coalesceEnabled
--conf "spark.datahub.stage_metadata_coalescing=false" # emitCoalescePeriodically
--conf "spark.datahub.log.mcps=true" # logMcps
--conf "spark.datahub.tags=etl,daily" # tags
--conf "spark.datahub.domains=urn:li:domain:analytics" # domains
--conf "spark.datahub.metadata.dataset.env=PROD" # openLineageConf.fabricType
--conf "spark.datahub.metadata.dataset.include_schema_metadata=true" # openLineageConf.includeSchemaMetadata
--conf "spark.datahub.captureColumnLevelLineage=true" # openLineageConf.captureColumnLevelLineage
--conf "spark.datahub.patch.enabled=false" # openLineageConf.usePatch