Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub OpenLineageToDataHub ConvertRunEvent

From Leeroopedia


Attribute Value
Page Type Implementation (API Doc)
Workflow Spark_Lineage_Capture
Pair 4 of 6
Principle Principle:Datahub_project_Datahub_OpenLineage_Conversion
Repository https://github.com/datahub-project/datahub
Source Location metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java:L1-1373
Last Updated 2026-02-09 17:00 GMT

Overview

Description

OpenLineageToDataHub is the converter class that transforms OpenLineage RunEvent objects into DataHub's metadata model. This class is a utility with only static methods (the constructor is private), providing a pure-function interface for the conversion logic. It resides in the openlineage-converter module, which is separate from the Spark-specific agent, enabling reuse by other OpenLineage-compatible integrations.

The converter handles the full spectrum of OpenLineage-to-DataHub mapping: job-level metadata (DataFlow and DataJob), dataset-level metadata (DatasetUrn construction, schema extraction), fine-grained column-level lineage, ownership, tags, domains, custom properties, and data process instance tracking.

Usage

The converter is called by DatahubEventEmitter.convertOpenLineageRunEventToDatahubJob() whenever an OpenLineage RunEvent is received from the Spark listener. It is also usable independently for converting OpenLineage events from non-Spark sources (e.g., Airflow, Trino) into DataHub metadata.

Code Reference

Source Location

File metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java
Lines L1-1373
Module openlineage-converter

Signature

Primary conversion method:

public static DatahubJob convertRunEventToJob(
    OpenLineage.RunEvent event,
    DatahubOpenlineageConfig datahubConf
) throws IOException, URISyntaxException

Dataset URN conversion:

public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
    OpenLineage.Dataset dataset,
    DatahubOpenlineageConfig mappingConfig
)

Tag and domain generation:

public static GlobalTags generateTags(List<String> tags)
public static Domains generateDomains(List<String> domains)

Flow URN construction:

public static DataFlowUrn getFlowUrn(
    String namespace,
    String jobName,
    String processingEngine,
    URI producer,
    DatahubOpenlineageConfig datahubOpenlineageConfig
)

Helper methods:

public static DataFlowInfo convertRunEventToDataFlowInfo(
    OpenLineage.RunEvent event, String flowName)
public static String getFlowName(String jobName, String flowName)
public static Urn dataPlatformInstanceUrn(String platform, String instance)
public static Edge createEdge(Urn urn, ZonedDateTime eventTime)
public static AuditStamp createAuditStamp(ZonedDateTime eventTime)
public static SchemaMetadata getSchemaMetadata(
    OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig)
public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType(
    String openLineageFieldType)

Import

import io.datahubproject.openlineage.converter.OpenLineageToDataHub;

I/O Contract

Direction Type Description
Input OpenLineage.RunEvent An OpenLineage run event containing: eventType (START, RUNNING, COMPLETE, FAIL, ABORT), job (with namespace and name), run (with runId and facets), inputs (list of InputDataset), outputs (list of OutputDataset), eventTime.
Input DatahubOpenlineageConfig Configuration controlling URN construction: fabricType, platformInstance, hivePlatformAlias, captureColumnLevelLineage, materializeDataset, disableSymlinkResolution, lowerCaseDatasetUrns, pathSpecs, pipelineName.
Output DatahubJob A DataHub job representation containing: flowUrn (DataFlowUrn), jobUrn (DataJobUrn), dataFlowInfo (DataFlowInfo with name and custom properties), jobInfo (DataJobInfo with name, flow URN, type, custom properties), inSet (Set of DatahubDataset for inputs), outSet (Set of DatahubDataset for outputs), flowOwnership (Ownership), flowGlobalTags (GlobalTags), flowPlatformInstance (DataPlatformInstance), dataProcessInstanceUrn, dataProcessInstanceRunEvent, dataProcessInstanceProperties, dataProcessInstanceRelationships.

Dataset URN construction flow:

OpenLineage.Dataset(namespace, name)
  -> Check symlinks facet for TABLE type
     -> If found: use symlink namespace/name (maps to hive/glue platform)
     -> Record original URN as alias for future resolution
  -> Parse namespace URI scheme to determine platform
     -> PLATFORM_MAP: awsathena->athena, sqlserver->mssql
     -> HDFS-like platforms: s3, gcs, abs -> HdfsPathDataset
  -> Apply platformInstance from config or PathSpec
  -> Apply FabricType from config or PathSpec
  -> Optionally lowercase URN
  -> Construct: urn:li:dataset:(urn:li:dataPlatform:<platform>,<instance>.<name>,<env>)

Usage Examples

Example 1: Convert a RunEvent to a DatahubJob

import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.openlineage.client.OpenLineage;

DatahubOpenlineageConfig config = DatahubOpenlineageConfig.builder()
    .isSpark(true)
    .fabricType(FabricType.PROD)
    .captureColumnLevelLineage(true)
    .materializeDataset(false)
    .build();

OpenLineage.RunEvent event = // ... received from OpenLineage Spark listener

DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(event, config);

// datahubJob.getFlowUrn()  -> urn:li:dataFlow:(spark,my-app,default)
// datahubJob.getJobUrn()   -> urn:li:dataJob:(urn:li:dataFlow:(spark,my-app,default),my-app.query_1)
// datahubJob.getInSet()    -> [DatahubDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,db.input_table,PROD))]
// datahubJob.getOutSet()   -> [DatahubDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,db.output_table,PROD))]

Example 2: Convert an OpenLineage dataset to a DatasetUrn

OpenLineage ol = new OpenLineage(URI.create("https://example.com"));
OpenLineage.Dataset dataset = ol.newStaticDatasetBuilder()
    .namespace("s3://my-bucket")
    .name("warehouse/db/table")
    .build();

DatahubOpenlineageConfig config = DatahubOpenlineageConfig.builder()
    .fabricType(FabricType.PROD)
    .build();

Optional<DatasetUrn> urn =
    OpenLineageToDataHub.convertOpenlineageDatasetToDatasetUrn(dataset, config);

// urn -> Optional[urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/warehouse/db/table,PROD)]

Example 3: Generate tags for a DataFlow

List<String> tags = Arrays.asList("etl", "daily-batch", "production");
GlobalTags globalTags = OpenLineageToDataHub.generateTags(tags);

// globalTags contains TagAssociations for: daily-batch, etl, production (sorted)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment