Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub SparkConfigParser ParseSparkConfig

From Leeroopedia


Metadata

Field Value
implementation_name SparkConfigParser ParseSparkConfig
type API Doc
status Active
last_updated 2026-02-10
source_file metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java
lines L26-418
repository datahub-project/datahub
domains Data_Lineage, Apache_Spark, Metadata_Management

Overview

SparkConfigParser is a utility class that extracts spark.datahub.* properties from the Spark environment and converts them into a Typesafe Config object. It also provides static accessor methods for individual configuration values with defaults, and constructs the DatahubOpenlineageConfig used by the OpenLineage converter.

Description

This class serves as the central configuration hub for the DataHub Spark lineage agent. It defines all configuration key constants, parses Spark properties into a structured Config, and provides typed accessor methods for each setting. The class is stateless (all methods are static) and the constructor is private.

Source Code Reference

File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java

Key Constants (L27-92)

public class SparkConfigParser {
  // Emitter configuration
  public static final String EMITTER_TYPE = "emitter";
  public static final String GMS_URL_KEY = "rest.server";
  public static final String GMS_AUTH_TOKEN = "rest.token";
  public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
  public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding";
  public static final String MAX_RETRIES = "rest.max_retries";
  public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
  public static final String FILE_EMITTER_FILE_NAME = "file.filename";
  public static final String CONFIG_LOG_MCPS = "log.mcps";

  // Kafka configuration
  public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
  public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
  public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
  public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
  public static final String KAFKA_EMITTER_PRODUCER_CONFIG = "kafka.producer_config";

  // S3 configuration
  public static final String S3_EMITTER_BUCKET = "s3.bucket";
  public static final String S3_EMITTER_REGION = "s3.region";
  public static final String S3_EMITTER_ENDPOINT = "s3.endpoint";
  public static final String S3_EMITTER_PREFIX = "s3.prefix";
  public static final String S3_EMITTER_ACCESS_KEY = "s3.access_key";
  public static final String S3_EMITTER_SECRET_KEY = "s3.secret_key";
  public static final String S3_EMITTER_PROFILE = "s3.profile";
  public static final String S3_EMITTER_FILE_NAME = "s3.filename";

  // Lineage behavior
  public static final String COALESCE_KEY = "coalesce_jobs";
  public static final String PATCH_ENABLED = "patch.enabled";
  public static final String STREAMING_JOB = "streaming_job";
  public static final String STREAMING_HEARTBEAT = "streaming_heartbeat";
  public static final String DATAHUB_FLOW_NAME = "flow_name";
  public static final String CAPTURE_COLUMN_LEVEL_LINEAGE = "captureColumnLevelLineage";

  // Dataset configuration
  public static final String DATASET_ENV_KEY = "metadata.dataset.env";
  public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias";
  public static final String DATASET_LOWERCASE_URNS = "metadata.dataset.lowerCaseUrns";
  public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
  public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
  public static final String DATASET_INCLUDE_SCHEMA_METADATA =
      "metadata.dataset.include_schema_metadata";

  // Tags and domains
  public static final String TAGS_KEY = "tags";
  public static final String DOMAINS_KEY = "domains";
}

parseSparkConfig (L132-144)

public static Config parseSparkConfig() {
    if (SparkEnv.get() == null) {
        return ConfigFactory.empty();
    }
    SparkConf conf = SparkEnv.get().conf();
    String propertiesString =
        Arrays.stream(conf.getAllWithPrefix("spark.datahub."))
            .map(tup -> tup._1 + "= \"" + tup._2 + "\"")
            .collect(Collectors.joining("\n"));
    return ConfigFactory.parseString(propertiesString);
}

parsePropertiesToConfig (L120-130)

public static Config parsePropertiesToConfig(Properties properties) {
    properties.keySet().removeIf(o ->
        (!o.toString().startsWith("spark.datahub.")
            && !o.toString().startsWith(
                SPARK_DATABRICKS_CLUSTER_USAGE_TAGS_CLUSTER_ALL_TAGS)));
    properties = SparkConfigParser.moveKeysToRoot(properties, "spark.datahub.");
    return ConfigFactory.parseProperties(properties);
}

sparkConfigToDatahubOpenlineageConf (L163-197)

public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
    Config sparkConfig, SparkAppContext sparkAppContext) {
    DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
        DatahubOpenlineageConfig.builder();
    builder.isSpark(true);
    builder.filePartitionRegexpPattern(getFilePartitionRegexpPattern(sparkConfig));
    builder.fabricType(getCommonFabricType(sparkConfig));
    builder.includeSchemaMetadata(isIncludeSchemaMetadata(sparkConfig));
    builder.materializeDataset(isDatasetMaterialize(sparkConfig));
    builder.pathSpecs(getPathSpecListMap(sparkConfig));
    builder.platformInstance(getPlatformInstance(sparkConfig));
    builder.commonDatasetPlatformInstance(getCommonPlatformInstance(sparkConfig));
    builder.hivePlatformAlias(getHivePlatformAlias(sparkConfig));
    builder.usePatch(isPatchEnabled(sparkConfig));
    builder.lowerCaseDatasetUrns(isLowerCaseDatasetUrns(sparkConfig));
    builder.captureColumnLevelLineage(isCaptureColumnLevelLineage(sparkConfig));
    // ... parent job URN parsing
    return builder.build();
}

I/O Contract

Aspect Details
Input SparkConf properties with spark.datahub.* prefix, or Properties object (for Databricks)
Output Typesafe Config object with the spark.datahub. prefix stripped
Null Handling Returns ConfigFactory.empty() if SparkEnv is null
Side Effects None (stateless utility class)

Key Accessor Methods

Method Return Type Default Line
parseSparkConfig() Config empty config L132
parsePropertiesToConfig(Properties) Config -- L120
getCommonFabricType(Config) FabricType PROD L199
getHivePlatformAlias(Config) String "hive" L214
isCoalesceEnabled(Config) boolean true L359
isPatchEnabled(Config) boolean false L366
isIncludeSchemaMetadata(Config) boolean false L332
isCaptureColumnLevelLineage(Config) boolean true L411
isEmitCoalescePeriodically(Config) boolean depends on Databricks tags L389
getStreamingHeartbeatSec(Config) int 300 (5 min) L314
getTags(Config) String[] null L237
getDomains(Config) String[] null L241
getPathSpecListMap(Config) Map<String, List<PathSpec>> empty map L270

Usage Examples

Complete REST Configuration

spark-submit \
  --conf "spark.datahub.rest.server=https://datahub.example.com:8080" \
  --conf "spark.datahub.rest.token=eyJhbGciOiJIUzI1NiJ9..." \
  --conf "spark.datahub.rest.max_retries=3" \
  --conf "spark.datahub.rest.retry_interval_in_sec=10" \
  --conf "spark.datahub.rest.disable_ssl_verification=false" \
  my_app.py

Kafka Emitter with Producer Config

spark-submit \
  --conf "spark.datahub.emitter=kafka" \
  --conf "spark.datahub.kafka.bootstrap=broker1:9092,broker2:9092" \
  --conf "spark.datahub.kafka.schema_registry_url=http://schema-registry:8081" \
  --conf "spark.datahub.kafka.mcp_topic=MetadataChangeProposal_v1" \
  --conf "spark.datahub.kafka.producer_config.security.protocol=SASL_SSL" \
  my_app.py

Selecting Emitter Type

# Emitter type is set via spark.datahub.emitter
# Supported values: rest (default), kafka, file, s3
--conf "spark.datahub.emitter=rest"
--conf "spark.datahub.emitter=kafka"
--conf "spark.datahub.emitter=file"
--conf "spark.datahub.emitter=s3"

Knowledge Sources

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment