Implementation:Datahub project Datahub OpenLineageToDataHub ConvertRunEvent
| Attribute | Value |
|---|---|
| Page Type | Implementation (API Doc) |
| Workflow | Spark_Lineage_Capture |
| Pair | 4 of 6 |
| Principle | Principle:Datahub_project_Datahub_OpenLineage_Conversion |
| Repository | https://github.com/datahub-project/datahub |
| Source Location | metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java:L1-1373 |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
OpenLineageToDataHub is the converter class that transforms OpenLineage RunEvent objects into DataHub's metadata model. This class is a utility with only static methods (the constructor is private), providing a pure-function interface for the conversion logic. It resides in the openlineage-converter module, which is separate from the Spark-specific agent, enabling reuse by other OpenLineage-compatible integrations.
The converter handles the full spectrum of OpenLineage-to-DataHub mapping: job-level metadata (DataFlow and DataJob), dataset-level metadata (DatasetUrn construction, schema extraction), fine-grained column-level lineage, ownership, tags, domains, custom properties, and data process instance tracking.
Usage
The converter is called by DatahubEventEmitter.convertOpenLineageRunEventToDatahubJob() whenever an OpenLineage RunEvent is received from the Spark listener. It is also usable independently for converting OpenLineage events from non-Spark sources (e.g., Airflow, Trino) into DataHub metadata.
Code Reference
Source Location
| File | metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java
|
| Lines | L1-1373 |
| Module | openlineage-converter
|
Signature
Primary conversion method:
public static DatahubJob convertRunEventToJob(
OpenLineage.RunEvent event,
DatahubOpenlineageConfig datahubConf
) throws IOException, URISyntaxException
Dataset URN conversion:
public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
OpenLineage.Dataset dataset,
DatahubOpenlineageConfig mappingConfig
)
Tag and domain generation:
public static GlobalTags generateTags(List<String> tags)
public static Domains generateDomains(List<String> domains)
Flow URN construction:
public static DataFlowUrn getFlowUrn(
String namespace,
String jobName,
String processingEngine,
URI producer,
DatahubOpenlineageConfig datahubOpenlineageConfig
)
Helper methods:
public static DataFlowInfo convertRunEventToDataFlowInfo(
OpenLineage.RunEvent event, String flowName)
public static String getFlowName(String jobName, String flowName)
public static Urn dataPlatformInstanceUrn(String platform, String instance)
public static Edge createEdge(Urn urn, ZonedDateTime eventTime)
public static AuditStamp createAuditStamp(ZonedDateTime eventTime)
public static SchemaMetadata getSchemaMetadata(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig)
public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType(
String openLineageFieldType)
Import
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | OpenLineage.RunEvent |
An OpenLineage run event containing: eventType (START, RUNNING, COMPLETE, FAIL, ABORT), job (with namespace and name), run (with runId and facets), inputs (list of InputDataset), outputs (list of OutputDataset), eventTime.
|
| Input | DatahubOpenlineageConfig |
Configuration controlling URN construction: fabricType, platformInstance, hivePlatformAlias, captureColumnLevelLineage, materializeDataset, disableSymlinkResolution, lowerCaseDatasetUrns, pathSpecs, pipelineName. |
| Output | DatahubJob |
A DataHub job representation containing: flowUrn (DataFlowUrn), jobUrn (DataJobUrn), dataFlowInfo (DataFlowInfo with name and custom properties), jobInfo (DataJobInfo with name, flow URN, type, custom properties), inSet (Set of DatahubDataset for inputs), outSet (Set of DatahubDataset for outputs), flowOwnership (Ownership), flowGlobalTags (GlobalTags), flowPlatformInstance (DataPlatformInstance), dataProcessInstanceUrn, dataProcessInstanceRunEvent, dataProcessInstanceProperties, dataProcessInstanceRelationships.
|
Dataset URN construction flow:
OpenLineage.Dataset(namespace, name)
-> Check symlinks facet for TABLE type
-> If found: use symlink namespace/name (maps to hive/glue platform)
-> Record original URN as alias for future resolution
-> Parse namespace URI scheme to determine platform
-> PLATFORM_MAP: awsathena->athena, sqlserver->mssql
-> HDFS-like platforms: s3, gcs, abs -> HdfsPathDataset
-> Apply platformInstance from config or PathSpec
-> Apply FabricType from config or PathSpec
-> Optionally lowercase URN
-> Construct: urn:li:dataset:(urn:li:dataPlatform:<platform>,<instance>.<name>,<env>)
Usage Examples
Example 1: Convert a RunEvent to a DatahubJob
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.openlineage.client.OpenLineage;
DatahubOpenlineageConfig config = DatahubOpenlineageConfig.builder()
.isSpark(true)
.fabricType(FabricType.PROD)
.captureColumnLevelLineage(true)
.materializeDataset(false)
.build();
OpenLineage.RunEvent event = // ... received from OpenLineage Spark listener
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(event, config);
// datahubJob.getFlowUrn() -> urn:li:dataFlow:(spark,my-app,default)
// datahubJob.getJobUrn() -> urn:li:dataJob:(urn:li:dataFlow:(spark,my-app,default),my-app.query_1)
// datahubJob.getInSet() -> [DatahubDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,db.input_table,PROD))]
// datahubJob.getOutSet() -> [DatahubDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,db.output_table,PROD))]
Example 2: Convert an OpenLineage dataset to a DatasetUrn
OpenLineage ol = new OpenLineage(URI.create("https://example.com"));
OpenLineage.Dataset dataset = ol.newStaticDatasetBuilder()
.namespace("s3://my-bucket")
.name("warehouse/db/table")
.build();
DatahubOpenlineageConfig config = DatahubOpenlineageConfig.builder()
.fabricType(FabricType.PROD)
.build();
Optional<DatasetUrn> urn =
OpenLineageToDataHub.convertOpenlineageDatasetToDatasetUrn(dataset, config);
// urn -> Optional[urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/warehouse/db/table,PROD)]
Example 3: Generate tags for a DataFlow
List<String> tags = Arrays.asList("etl", "daily-batch", "production");
GlobalTags globalTags = OpenLineageToDataHub.generateTags(tags);
// globalTags contains TagAssociations for: daily-batch, etl, production (sorted)