Implementation:Datahub project Datahub SparkConfigParser ParseSparkConfig
Metadata
| Field | Value |
|---|---|
| implementation_name | SparkConfigParser ParseSparkConfig |
| type | API Doc |
| status | Active |
| last_updated | 2026-02-10 |
| source_file | metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java
|
| lines | L26-418 |
| repository | datahub-project/datahub |
| domains | Data_Lineage, Apache_Spark, Metadata_Management |
Overview
SparkConfigParser is a utility class that extracts spark.datahub.* properties from the Spark environment and converts them into a Typesafe Config object. It also provides static accessor methods for individual configuration values with defaults, and constructs the DatahubOpenlineageConfig used by the OpenLineage converter.
Description
This class serves as the central configuration hub for the DataHub Spark lineage agent. It defines all configuration key constants, parses Spark properties into a structured Config, and provides typed accessor methods for each setting. The class is stateless (all methods are static) and the constructor is private.
Source Code Reference
File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java
Key Constants (L27-92)
public class SparkConfigParser {
// Emitter configuration
public static final String EMITTER_TYPE = "emitter";
public static final String GMS_URL_KEY = "rest.server";
public static final String GMS_AUTH_TOKEN = "rest.token";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String FILE_EMITTER_FILE_NAME = "file.filename";
public static final String CONFIG_LOG_MCPS = "log.mcps";
// Kafka configuration
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
public static final String KAFKA_EMITTER_PRODUCER_CONFIG = "kafka.producer_config";
// S3 configuration
public static final String S3_EMITTER_BUCKET = "s3.bucket";
public static final String S3_EMITTER_REGION = "s3.region";
public static final String S3_EMITTER_ENDPOINT = "s3.endpoint";
public static final String S3_EMITTER_PREFIX = "s3.prefix";
public static final String S3_EMITTER_ACCESS_KEY = "s3.access_key";
public static final String S3_EMITTER_SECRET_KEY = "s3.secret_key";
public static final String S3_EMITTER_PROFILE = "s3.profile";
public static final String S3_EMITTER_FILE_NAME = "s3.filename";
// Lineage behavior
public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";
public static final String STREAMING_JOB = "streaming_job";
public static final String STREAMING_HEARTBEAT = "streaming_heartbeat";
public static final String DATAHUB_FLOW_NAME = "flow_name";
public static final String CAPTURE_COLUMN_LEVEL_LINEAGE = "captureColumnLevelLineage";
// Dataset configuration
public static final String DATASET_ENV_KEY = "metadata.dataset.env";
public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias";
public static final String DATASET_LOWERCASE_URNS = "metadata.dataset.lowerCaseUrns";
public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
"metadata.dataset.include_schema_metadata";
// Tags and domains
public static final String TAGS_KEY = "tags";
public static final String DOMAINS_KEY = "domains";
}
parseSparkConfig (L132-144)
public static Config parseSparkConfig() {
if (SparkEnv.get() == null) {
return ConfigFactory.empty();
}
SparkConf conf = SparkEnv.get().conf();
String propertiesString =
Arrays.stream(conf.getAllWithPrefix("spark.datahub."))
.map(tup -> tup._1 + "= \"" + tup._2 + "\"")
.collect(Collectors.joining("\n"));
return ConfigFactory.parseString(propertiesString);
}
parsePropertiesToConfig (L120-130)
public static Config parsePropertiesToConfig(Properties properties) {
properties.keySet().removeIf(o ->
(!o.toString().startsWith("spark.datahub.")
&& !o.toString().startsWith(
SPARK_DATABRICKS_CLUSTER_USAGE_TAGS_CLUSTER_ALL_TAGS)));
properties = SparkConfigParser.moveKeysToRoot(properties, "spark.datahub.");
return ConfigFactory.parseProperties(properties);
}
sparkConfigToDatahubOpenlineageConf (L163-197)
public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
Config sparkConfig, SparkAppContext sparkAppContext) {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.isSpark(true);
builder.filePartitionRegexpPattern(getFilePartitionRegexpPattern(sparkConfig));
builder.fabricType(getCommonFabricType(sparkConfig));
builder.includeSchemaMetadata(isIncludeSchemaMetadata(sparkConfig));
builder.materializeDataset(isDatasetMaterialize(sparkConfig));
builder.pathSpecs(getPathSpecListMap(sparkConfig));
builder.platformInstance(getPlatformInstance(sparkConfig));
builder.commonDatasetPlatformInstance(getCommonPlatformInstance(sparkConfig));
builder.hivePlatformAlias(getHivePlatformAlias(sparkConfig));
builder.usePatch(isPatchEnabled(sparkConfig));
builder.lowerCaseDatasetUrns(isLowerCaseDatasetUrns(sparkConfig));
builder.captureColumnLevelLineage(isCaptureColumnLevelLineage(sparkConfig));
// ... parent job URN parsing
return builder.build();
}
I/O Contract
| Aspect | Details |
|---|---|
| Input | SparkConf properties with spark.datahub.* prefix, or Properties object (for Databricks)
|
| Output | Typesafe Config object with the spark.datahub. prefix stripped
|
| Null Handling | Returns ConfigFactory.empty() if SparkEnv is null
|
| Side Effects | None (stateless utility class) |
Key Accessor Methods
| Method | Return Type | Default | Line |
|---|---|---|---|
parseSparkConfig() |
Config |
empty config | L132 |
parsePropertiesToConfig(Properties) |
Config |
-- | L120 |
getCommonFabricType(Config) |
FabricType |
PROD |
L199 |
getHivePlatformAlias(Config) |
String |
"hive" |
L214 |
isCoalesceEnabled(Config) |
boolean |
true |
L359 |
isPatchEnabled(Config) |
boolean |
false |
L366 |
isIncludeSchemaMetadata(Config) |
boolean |
false |
L332 |
isCaptureColumnLevelLineage(Config) |
boolean |
true |
L411 |
isEmitCoalescePeriodically(Config) |
boolean |
depends on Databricks tags | L389 |
getStreamingHeartbeatSec(Config) |
int |
300 (5 min) |
L314 |
getTags(Config) |
String[] |
null |
L237 |
getDomains(Config) |
String[] |
null |
L241 |
getPathSpecListMap(Config) |
Map<String, List<PathSpec>> |
empty map | L270 |
Usage Examples
Complete REST Configuration
spark-submit \
--conf "spark.datahub.rest.server=https://datahub.example.com:8080" \
--conf "spark.datahub.rest.token=eyJhbGciOiJIUzI1NiJ9..." \
--conf "spark.datahub.rest.max_retries=3" \
--conf "spark.datahub.rest.retry_interval_in_sec=10" \
--conf "spark.datahub.rest.disable_ssl_verification=false" \
my_app.py
Kafka Emitter with Producer Config
spark-submit \
--conf "spark.datahub.emitter=kafka" \
--conf "spark.datahub.kafka.bootstrap=broker1:9092,broker2:9092" \
--conf "spark.datahub.kafka.schema_registry_url=http://schema-registry:8081" \
--conf "spark.datahub.kafka.mcp_topic=MetadataChangeProposal_v1" \
--conf "spark.datahub.kafka.producer_config.security.protocol=SASL_SSL" \
my_app.py
Selecting Emitter Type
# Emitter type is set via spark.datahub.emitter
# Supported values: rest (default), kafka, file, s3
--conf "spark.datahub.emitter=rest"
--conf "spark.datahub.emitter=kafka"
--conf "spark.datahub.emitter=file"
--conf "spark.datahub.emitter=s3"
Knowledge Sources
- DataHub GitHub Repository
- OpenLineage Documentation
- Apache Spark Configuration Documentation
- Typesafe Config Library
Related
- Implements: Datahub_project_Datahub_Spark_Connection_Configuration
- Related to: Datahub_project_Datahub_DatahubSparkListener_Init
- Related to: Datahub_project_Datahub_SparkLineageConf_Builder
- Environment: Environment:Datahub_project_Datahub_Spark_Lineage_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Spark_Databricks_Coalescing