Implementation:Datahub project Datahub DatahubJob
| Knowledge Sources | |
|---|---|
| Domains | OpenLineage_Integration |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
A Lombok-based data model representing a DataHub job entity derived from OpenLineage events, encapsulating DataFlow, DataJob, DataProcessInstance aspects and providing toMcps() to generate a complete list of MetadataChangeProposal objects for emission.
Description
DatahubJob is the central output model of the OpenLineage-to-DataHub conversion pipeline. It aggregates all the metadata aspects needed to represent a data pipeline job in DataHub:
Core fields:
flowUrn/dataFlowInfo-- The DataFlow entity (pipeline) this job belongs tojobUrn/jobInfo-- The DataJob entity (individual task/query)flowOwnership/flowGlobalTags/flowDomains-- Ownership, tags, and domain associations for the flowflowPlatformInstance-- Optional platform instance for multi-tenant deploymentsdataProcessInstanceRunEvent/dataProcessInstanceProperties/dataProcessInstanceRelationships-- Run instance trackinginSet/outSet-- Input and output datasets (usingTreeSetwithDataSetComparatorfor deterministic ordering)parentJobs-- Parent DataJob URNs for pipeline hierarchy
MCP Generation (toMcps()):
The method generates a comprehensive list of MetadataChangeProposal objects:
- DataFlow info and status
- Platform instance (if configured)
- DataJob info with custom properties (including start/end times) and status
- Global tags (supports patch mode via
GlobalTagsPatchBuilder) - Domain associations
- Input dataset edges with optional schema metadata and upstream lineage
- Output dataset edges with optional schema metadata
- DataJobInputOutput aspect (supports patch mode via
DataJobInputOutputPatchBuilder) with fine-grained lineage - DataProcessInstance input/output, properties, run event, and relationships
Patch mode -- When config.isUsePatch() is true, the class uses patch builders instead of full aspect replacement, enabling incremental updates to existing entities.
Legacy lineage removal -- When config.isRemoveLegacyLineage() is true, it removes upstream lineage previously set directly on datasets (before the migration to DataJob-based lineage).
Usage
Use DatahubJob as the output of OpenLineageToDataHub.convertRunEventToJob(). After conversion, call toMcps(config) to generate the full list of MCPs for emission to DataHub via REST or Kafka.
Code Reference
Source Location
- Repository: Datahub_project_Datahub
- File: metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java
Signature
@EqualsAndHashCode
@Getter
@Setter
@Builder
@ToString
@Slf4j
public class DatahubJob {
// Entity URNs and info
DataFlowUrn flowUrn;
DataFlowInfo dataFlowInfo;
DataJobUrn jobUrn;
DataJobInfo jobInfo;
// Metadata aspects
Ownership flowOwnership;
GlobalTags flowGlobalTags;
Domains flowDomains;
DataPlatformInstance flowPlatformInstance;
// Process instance
DataProcessInstanceRunEvent dataProcessInstanceRunEvent;
DataProcessInstanceProperties dataProcessInstanceProperties;
DataProcessInstanceRelationships dataProcessInstanceRelationships;
Urn dataProcessInstanceUrn;
// Input/output datasets
final Set<DatahubDataset> inSet;
final Set<DatahubDataset> outSet;
final Set<DataJobUrn> parentJobs;
// MCP generation
public List<MetadataChangeProposal> toMcps(DatahubOpenlineageConfig config) throws IOException;
// Dataset materialization
public static MetadataChangeProposalWrapper materializeDataset(DatasetUrn datasetUrn);
}
Import
import io.datahubproject.openlineage.dataset.DatahubJob;
I/O Contract
| Input | Type | Description |
|---|---|---|
| config | DatahubOpenlineageConfig |
Configuration controlling patch mode, dataset materialization, schema metadata, and lineage options |
| (Builder fields) | Various | Flow URN, job URN, info, ownership, tags, datasets, etc. |
| Output | Type | Description |
|---|---|---|
| MCPs | List<MetadataChangeProposal> |
Complete set of MCPs for DataFlow, DataJob, DataProcessInstance, and Dataset entities |
Usage Examples
// Build from OpenLineage event
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(event, config);
// Generate MCPs
List<MetadataChangeProposal> mcps = datahubJob.toMcps(config);
// Emit MCPs
for (MetadataChangeProposal mcp : mcps) {
emitter.emit(mcp);
}