Implementation:Datahub project Datahub DatahubEventEmitter Emit
Metadata
| Field | Value |
|---|---|
| implementation_name | DatahubEventEmitter Emit |
| type | API Doc |
| status | Active |
| last_updated | 2026-02-10 |
| source_file | metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java and metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java
|
| lines | DatahubEventEmitter.java:L62-457, OpenLineageToDataHub.java:L87-1373 |
| repository | datahub-project/datahub |
| domains | Data_Lineage, Apache_Spark, Metadata_Management |
Overview
DatahubEventEmitter is the core emission engine that receives OpenLineage RunEvent objects, converts them to DataHub DatahubJob objects, buffers them for coalescing, and emits the resulting MCPs to the configured backend. It extends EventEmitter from the OpenLineage Spark agent and works in conjunction with the OpenLineageToDataHub converter.
Description
This implementation spans two classes:
- DatahubEventEmitter -- Orchestrates the emission pipeline (receive, convert, buffer, coalesce, emit)
- OpenLineageToDataHub -- Converts OpenLineage RunEvents into DataHub model objects (DatahubJob, DatahubDataset, URNs, aspects)
The emitter supports both coalesced and non-coalesced emission modes, handles streaming detection, and manages the lifecycle of the underlying transport (REST, Kafka, File, or S3).
Source Code Reference
DatahubEventEmitter Class (DatahubEventEmitter.java)
File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java
Class Declaration and Fields (L62-73)
@Slf4j
public class DatahubEventEmitter extends EventEmitter {
private final AtomicBoolean streaming = new AtomicBoolean(false);
private final List<DatahubJob> _datahubJobs = new LinkedList<>();
private final Map<String, MetadataChangeProposalWrapper> schemaMap = new HashMap<>();
private SparkLineageConf datahubConf;
private static final int DEFAULT_TIMEOUT_SEC = 10;
private final ObjectMapper objectMapper;
private final JacksonDataTemplateCodec dataTemplateCodec;
private final EventFormatter eventFormatter = new EventFormatter();
}
Constructor (L75-87)
public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName)
throws URISyntaxException {
super(config, applicationJobName);
objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
int maxSize = Integer.parseInt(
System.getenv().getOrDefault(
INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
objectMapper.getFactory().setStreamReadConstraints(
StreamReadConstraints.builder().maxStringLength(maxSize).build());
dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());
}
emit (L152-196)
The main entry point, called by the OpenLineage context factory for each RunEvent:
public void emit(OpenLineage.RunEvent event) {
long startTime = System.currentTimeMillis();
// Re-serialize/deserialize to normalize event format
event = OpenLineageClientUtils.runEventFromJson(
OpenLineageClientUtils.toJson(event));
Optional<DatahubJob> job = convertOpenLineageRunEventToDatahubJob(event);
if (!job.isPresent()) {
return;
}
// Apply tags and domains
if (!datahubConf.getTags().isEmpty()) {
GlobalTags tags = OpenLineageToDataHub.generateTags(datahubConf.getTags());
job.get().setFlowGlobalTags(tags);
}
if (!datahubConf.getDomains().isEmpty()) {
Domains domains = OpenLineageToDataHub.generateDomains(datahubConf.getDomains());
job.get().setFlowDomains(domains);
}
if (isStreaming()) {
return; // Skip for streaming mode
}
if (!datahubConf.isCoalesceEnabled()) {
// Emit immediately for non-coalesced mode
emitMcps(job.get().toMcps(datahubConf.getOpenLineageConf()));
}
if (datahubConf.isCoalesceEnabled() && datahubConf.isEmitCoalescePeriodically()) {
// Emit coalesced data periodically
emitCoalesced();
}
}
emitCoalesced (L198-213)
public void emitCoalesced() {
long startTime = System.currentTimeMillis();
if (isStreaming()) {
return;
}
if (datahubConf.isCoalesceEnabled()) {
List<MetadataChangeProposal> mcps = generateCoalescedMcps();
log.info("Emitting Coalesced lineage completed successfully");
emitMcps(mcps);
}
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("Emitting coalesced lineage completed in {} ms", elapsedTime);
}
generateCoalescedMcps (L215-304)
Merges all buffered DatahubJob objects into a single coalesced job:
public List<MetadataChangeProposal> generateCoalescedMcps() {
List<MetadataChangeProposal> mcps = new ArrayList<>();
if (_datahubJobs.isEmpty()) {
log.warn("No lineage events to emit.");
return mcps;
}
DatahubJob datahubJob = DatahubJob.builder().build();
AtomicLong minStartTime = new AtomicLong(Long.MAX_VALUE);
AtomicLong maxEndTime = new AtomicLong();
_datahubJobs.forEach(storedDatahubJob -> {
// Set URNs from stored jobs
DataJobUrn jobUrn = jobUrn(
storedDatahubJob.getFlowUrn(),
storedDatahubJob.getFlowUrn().getFlowIdEntity());
datahubJob.setJobUrn(jobUrn);
datahubJob.setFlowUrn(storedDatahubJob.getFlowUrn());
// Merge job info and custom properties
// ... merge logic ...
// Track time boundaries
if (storedDatahubJob.getStartTime() < minStartTime.get()) {
minStartTime.set(storedDatahubJob.getStartTime());
}
if (storedDatahubJob.getEndTime() > maxEndTime.get()) {
maxEndTime.set(storedDatahubJob.getEndTime());
}
// Merge datasets and process instances
mergeDatasets(storedDatahubJob.getOutSet(), datahubJob.getOutSet());
mergeDatasets(storedDatahubJob.getInSet(), datahubJob.getInSet());
mergeDataProcessInstance(datahubJob, storedDatahubJob);
mergeCustomProperties(datahubJob, storedDatahubJob);
});
// Apply tags and domains, generate MCPs
return datahubJob.toMcps(datahubConf.getOpenLineageConf());
}
emitMcps (L406-444)
Sends MCPs to the configured backend:
protected void emitMcps(List<MetadataChangeProposal> mcps) {
Optional<Emitter> emitter = getEmitter();
if (emitter.isPresent()) {
mcps.stream()
.map(mcp -> {
try {
if (this.datahubConf.isLogMcps()) {
DataMap map = mcp.data();
String serializedMCP = dataTemplateCodec.mapToString(map);
log.info("emitting mcpw: {}", serializedMCP);
} else {
log.info("emitting aspect: {} for urn: {}",
mcp.getAspectName(), mcp.getEntityUrn());
}
return emitter.get().emit(mcp);
} catch (IOException ioException) {
log.error("Failed to emit metadata to DataHub", ioException);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList())
.forEach(future -> {
try {
log.info(future.get(DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS).toString());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to emit metadata to DataHub", e);
}
});
emitter.get().close();
}
}
OpenLineageToDataHub Converter (OpenLineageToDataHub.java)
File: metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java
convertRunEventToJob (L303-359)
The primary conversion method that transforms an OpenLineage RunEvent into a DatahubJob:
public static DatahubJob convertRunEventToJob(
OpenLineage.RunEvent event, DatahubOpenlineageConfig datahubConf)
throws IOException, URISyntaxException {
DatahubJob.DatahubJobBuilder jobBuilder = DatahubJob.builder();
if (event.getEventTime() != null) {
jobBuilder.eventTime(event.getEventTime().toInstant().toEpochMilli());
}
DataFlowInfo dfi = convertRunEventToDataFlowInfo(event, datahubConf.getPipelineName());
DataFlowUrn dataFlowUrn = getFlowUrn(
event.getJob().getNamespace(),
event.getJob().getName(),
processingEngine, event.getProducer(), datahubConf);
jobBuilder.flowUrn(dataFlowUrn);
// ... platform instance, custom properties, ownership, tags ...
DatahubJob datahubJob = jobBuilder.build();
convertJobToDataJob(datahubJob, event, datahubConf);
return datahubJob;
}
Dataset URN Conversion (L125-184)
public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {
String namespace = dataset.getNamespace();
String datasetName = dataset.getName();
// Handle symlinks (e.g., Glue catalog, Hive metastore)
if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null
&& !mappingConfig.isDisableSymlinkResolution()) {
// Resolve symlink to table name
}
// Handle URI-based namespaces (s3://, hdfs://, etc.)
if (namespace.contains("://")) {
// Parse URI, extract platform, resolve HDFS paths
}
// Construct DatasetUrn with platform, instance, name, and environment
return Optional.of(DatahubUtils.createDatasetUrn(
platform, platformInstance, datasetName, env));
}
Column-Level Lineage (L413-538)
private static UpstreamLineage getFineGrainedLineage(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig,
OpenLineage.Job job) {
// Extract column lineage from OpenLineage ColumnLineageDatasetFacet
// For each output field:
// 1. Map to downstream schemaField URN
// 2. Map input fields to upstream schemaField URNs
// 3. Capture transformation type/subtype
// 4. Extract SQL query from SQLJobFacet
// 5. Build FineGrainedLineage objects
return upstreamLineage;
}
Data Process Instance (L1193-1237)
private static DataProcessInstanceRunEvent processDataProcessInstanceResult(
OpenLineage.RunEvent event) {
DataProcessInstanceRunEvent dpire = new DataProcessInstanceRunEvent();
switch (event.getEventType()) {
case COMPLETE: dpire.setStatus(DataProcessRunStatus.COMPLETE);
result.setType(RunResultType.SUCCESS); break;
case FAIL:
case ABORT: dpire.setStatus(DataProcessRunStatus.COMPLETE);
result.setType(RunResultType.FAILURE); break;
case START:
case RUNNING: dpire.setStatus(DataProcessRunStatus.STARTED); break;
case OTHER: result.setType(RunResultType.$UNKNOWN); break;
}
return dpire;
}
I/O Contract
| Aspect | Details |
|---|---|
| Input | OpenLineage.RunEvent (from OpenLineage Spark listener)
|
| Output | List<MetadataChangeProposal> emitted to REST API, Kafka, file, or S3
|
| Buffering | _datahubJobs list accumulates DatahubJob objects until coalescing
|
| Timeout | 10 seconds per MCP emission (DEFAULT_TIMEOUT_SEC) |
| Error Handling | Individual MCP failures are logged but do not block remaining emissions |
| Thread Safety | streaming flag uses AtomicBoolean
|
Generated Entity Types
| Entity | URN Pattern | Aspects Generated |
|---|---|---|
| DataFlow | urn:li:dataFlow:(orchestrator,flowName,namespace) |
DataFlowInfo, Ownership, GlobalTags, Domains, DataPlatformInstance |
| DataJob | urn:li:dataJob:(flowUrn,jobName) |
DataJobInfo, DataJobInputOutput |
| Dataset | urn:li:dataset:(platform,name,env) |
SchemaMetadata, UpstreamLineage (if configured) |
| DataProcessInstance | urn:li:dataProcessInstance:runId |
Properties, Relationships, RunEvent |
Usage Examples
Internal Usage (called by OpenLineage context factory)
// The emit method is called automatically by the OpenLineage framework
// when Spark lifecycle events occur. No direct invocation is needed.
// Internal flow:
// 1. OpenLineageSparkListener receives Spark event
// 2. ContextFactory creates OpenLineage.RunEvent
// 3. DatahubEventEmitter.emit(event) is called
// 4. Event is converted, buffered, and optionally emitted
// At application end:
// DatahubSparkListener.onApplicationEnd() calls emitter.emitCoalesced()
Emitter Type Resolution
// The emitter is resolved from SparkLineageConf at emission time:
private Optional<Emitter> getEmitter() {
if (datahubConf.getDatahubEmitterConfig() instanceof RestDatahubEmitterConfig) {
return Optional.of(new RestEmitter(config.getRestEmitterConfig()));
} else if (datahubConf.getDatahubEmitterConfig() instanceof KafkaDatahubEmitterConfig) {
return Optional.of(new KafkaEmitter(config.getKafkaEmitterConfig(), topic));
} else if (datahubConf.getDatahubEmitterConfig() instanceof FileDatahubEmitterConfig) {
return Optional.of(new FileEmitter(config.getFileEmitterConfig()));
} else if (datahubConf.getDatahubEmitterConfig() instanceof S3DatahubEmitterConfig) {
return Optional.of(new S3Emitter(config.getS3EmitterConfig()));
}
}
Coalescing Merge Strategy
The coalescing process merges multiple DatahubJob objects with the following rules:
| Field | Merge Strategy |
|---|---|
| Job URN | Last job's URN (derived from flow URN) |
| Flow URN | Last job's flow URN |
| Job Info | First non-null job info; custom properties are merged (first value wins on conflict) |
| Data Flow Info | First non-null; custom properties merged with app context metadata |
| Input Datasets | Union; if same URN exists, update schema and lineage |
| Output Datasets | Union; if same URN exists, update schema and lineage |
| Start Time | Minimum across all jobs |
| End Time | Maximum across all jobs |
| Process Instance URN | First job's instance URN (stable identity) |
| Process Instance Status | FAILURE overrides SUCCESS |
| Tags | Applied from SparkLineageConf after coalescing |
| Domains | Applied from SparkLineageConf after coalescing |
Knowledge Sources
- DataHub GitHub Repository
- OpenLineage Documentation
- OpenLineage RunEvent Specification
- Apache Spark SparkListener API
Related
- Implements: Datahub_project_Datahub_Automatic_Lineage_Emission
- Related to: Datahub_project_Datahub_DatahubSparkListener_Init
- Related to: Datahub_project_Datahub_SparkLineageConf_Builder
- Environment: Environment:Datahub_project_Datahub_Spark_Lineage_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Spark_Databricks_Coalescing