Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub DatahubEventEmitter Emit

From Leeroopedia


Metadata

Field Value
implementation_name DatahubEventEmitter Emit
type API Doc
status Active
last_updated 2026-02-10
source_file metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java and metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java
lines DatahubEventEmitter.java:L62-457, OpenLineageToDataHub.java:L87-1373
repository datahub-project/datahub
domains Data_Lineage, Apache_Spark, Metadata_Management

Overview

DatahubEventEmitter is the core emission engine that receives OpenLineage RunEvent objects, converts them to DataHub DatahubJob objects, buffers them for coalescing, and emits the resulting MCPs to the configured backend. It extends EventEmitter from the OpenLineage Spark agent and works in conjunction with the OpenLineageToDataHub converter.

Description

This implementation spans two classes:

  • DatahubEventEmitter -- Orchestrates the emission pipeline (receive, convert, buffer, coalesce, emit)
  • OpenLineageToDataHub -- Converts OpenLineage RunEvents into DataHub model objects (DatahubJob, DatahubDataset, URNs, aspects)

The emitter supports both coalesced and non-coalesced emission modes, handles streaming detection, and manages the lifecycle of the underlying transport (REST, Kafka, File, or S3).

Source Code Reference

DatahubEventEmitter Class (DatahubEventEmitter.java)

File: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java

Class Declaration and Fields (L62-73)

@Slf4j
public class DatahubEventEmitter extends EventEmitter {
  private final AtomicBoolean streaming = new AtomicBoolean(false);
  private final List<DatahubJob> _datahubJobs = new LinkedList<>();
  private final Map<String, MetadataChangeProposalWrapper> schemaMap = new HashMap<>();
  private SparkLineageConf datahubConf;
  private static final int DEFAULT_TIMEOUT_SEC = 10;
  private final ObjectMapper objectMapper;
  private final JacksonDataTemplateCodec dataTemplateCodec;
  private final EventFormatter eventFormatter = new EventFormatter();
}

Constructor (L75-87)

public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName)
    throws URISyntaxException {
  super(config, applicationJobName);
  objectMapper = new ObjectMapper()
      .setSerializationInclusion(JsonInclude.Include.NON_NULL);
  int maxSize = Integer.parseInt(
      System.getenv().getOrDefault(
          INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
  objectMapper.getFactory().setStreamReadConstraints(
      StreamReadConstraints.builder().maxStringLength(maxSize).build());
  dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());
}

emit (L152-196)

The main entry point, called by the OpenLineage context factory for each RunEvent:

public void emit(OpenLineage.RunEvent event) {
  long startTime = System.currentTimeMillis();
  // Re-serialize/deserialize to normalize event format
  event = OpenLineageClientUtils.runEventFromJson(
      OpenLineageClientUtils.toJson(event));
  Optional<DatahubJob> job = convertOpenLineageRunEventToDatahubJob(event);
  if (!job.isPresent()) {
    return;
  }

  // Apply tags and domains
  if (!datahubConf.getTags().isEmpty()) {
    GlobalTags tags = OpenLineageToDataHub.generateTags(datahubConf.getTags());
    job.get().setFlowGlobalTags(tags);
  }
  if (!datahubConf.getDomains().isEmpty()) {
    Domains domains = OpenLineageToDataHub.generateDomains(datahubConf.getDomains());
    job.get().setFlowDomains(domains);
  }

  if (isStreaming()) {
    return; // Skip for streaming mode
  }
  if (!datahubConf.isCoalesceEnabled()) {
    // Emit immediately for non-coalesced mode
    emitMcps(job.get().toMcps(datahubConf.getOpenLineageConf()));
  }
  if (datahubConf.isCoalesceEnabled() && datahubConf.isEmitCoalescePeriodically()) {
    // Emit coalesced data periodically
    emitCoalesced();
  }
}

emitCoalesced (L198-213)

public void emitCoalesced() {
  long startTime = System.currentTimeMillis();
  if (isStreaming()) {
    return;
  }
  if (datahubConf.isCoalesceEnabled()) {
    List<MetadataChangeProposal> mcps = generateCoalescedMcps();
    log.info("Emitting Coalesced lineage completed successfully");
    emitMcps(mcps);
  }
  long elapsedTime = System.currentTimeMillis() - startTime;
  log.info("Emitting coalesced lineage completed in {} ms", elapsedTime);
}

generateCoalescedMcps (L215-304)

Merges all buffered DatahubJob objects into a single coalesced job:

public List<MetadataChangeProposal> generateCoalescedMcps() {
  List<MetadataChangeProposal> mcps = new ArrayList<>();
  if (_datahubJobs.isEmpty()) {
    log.warn("No lineage events to emit.");
    return mcps;
  }

  DatahubJob datahubJob = DatahubJob.builder().build();
  AtomicLong minStartTime = new AtomicLong(Long.MAX_VALUE);
  AtomicLong maxEndTime = new AtomicLong();

  _datahubJobs.forEach(storedDatahubJob -> {
    // Set URNs from stored jobs
    DataJobUrn jobUrn = jobUrn(
        storedDatahubJob.getFlowUrn(),
        storedDatahubJob.getFlowUrn().getFlowIdEntity());
    datahubJob.setJobUrn(jobUrn);
    datahubJob.setFlowUrn(storedDatahubJob.getFlowUrn());

    // Merge job info and custom properties
    // ... merge logic ...

    // Track time boundaries
    if (storedDatahubJob.getStartTime() < minStartTime.get()) {
      minStartTime.set(storedDatahubJob.getStartTime());
    }
    if (storedDatahubJob.getEndTime() > maxEndTime.get()) {
      maxEndTime.set(storedDatahubJob.getEndTime());
    }

    // Merge datasets and process instances
    mergeDatasets(storedDatahubJob.getOutSet(), datahubJob.getOutSet());
    mergeDatasets(storedDatahubJob.getInSet(), datahubJob.getInSet());
    mergeDataProcessInstance(datahubJob, storedDatahubJob);
    mergeCustomProperties(datahubJob, storedDatahubJob);
  });

  // Apply tags and domains, generate MCPs
  return datahubJob.toMcps(datahubConf.getOpenLineageConf());
}

emitMcps (L406-444)

Sends MCPs to the configured backend:

protected void emitMcps(List<MetadataChangeProposal> mcps) {
  Optional<Emitter> emitter = getEmitter();
  if (emitter.isPresent()) {
    mcps.stream()
      .map(mcp -> {
        try {
          if (this.datahubConf.isLogMcps()) {
            DataMap map = mcp.data();
            String serializedMCP = dataTemplateCodec.mapToString(map);
            log.info("emitting mcpw: {}", serializedMCP);
          } else {
            log.info("emitting aspect: {} for urn: {}",
                mcp.getAspectName(), mcp.getEntityUrn());
          }
          return emitter.get().emit(mcp);
        } catch (IOException ioException) {
          log.error("Failed to emit metadata to DataHub", ioException);
          return null;
        }
      })
      .filter(Objects::nonNull)
      .collect(Collectors.toList())
      .forEach(future -> {
        try {
          log.info(future.get(DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS).toString());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
          log.error("Failed to emit metadata to DataHub", e);
        }
      });
    emitter.get().close();
  }
}

OpenLineageToDataHub Converter (OpenLineageToDataHub.java)

File: metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java

convertRunEventToJob (L303-359)

The primary conversion method that transforms an OpenLineage RunEvent into a DatahubJob:

public static DatahubJob convertRunEventToJob(
    OpenLineage.RunEvent event, DatahubOpenlineageConfig datahubConf)
    throws IOException, URISyntaxException {
  DatahubJob.DatahubJobBuilder jobBuilder = DatahubJob.builder();

  if (event.getEventTime() != null) {
    jobBuilder.eventTime(event.getEventTime().toInstant().toEpochMilli());
  }

  DataFlowInfo dfi = convertRunEventToDataFlowInfo(event, datahubConf.getPipelineName());
  DataFlowUrn dataFlowUrn = getFlowUrn(
      event.getJob().getNamespace(),
      event.getJob().getName(),
      processingEngine, event.getProducer(), datahubConf);
  jobBuilder.flowUrn(dataFlowUrn);

  // ... platform instance, custom properties, ownership, tags ...

  DatahubJob datahubJob = jobBuilder.build();
  convertJobToDataJob(datahubJob, event, datahubConf);
  return datahubJob;
}

Dataset URN Conversion (L125-184)

public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
    OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {
  String namespace = dataset.getNamespace();
  String datasetName = dataset.getName();

  // Handle symlinks (e.g., Glue catalog, Hive metastore)
  if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null
      && !mappingConfig.isDisableSymlinkResolution()) {
    // Resolve symlink to table name
  }

  // Handle URI-based namespaces (s3://, hdfs://, etc.)
  if (namespace.contains("://")) {
    // Parse URI, extract platform, resolve HDFS paths
  }

  // Construct DatasetUrn with platform, instance, name, and environment
  return Optional.of(DatahubUtils.createDatasetUrn(
      platform, platformInstance, datasetName, env));
}

Column-Level Lineage (L413-538)

private static UpstreamLineage getFineGrainedLineage(
    OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig,
    OpenLineage.Job job) {
  // Extract column lineage from OpenLineage ColumnLineageDatasetFacet
  // For each output field:
  //   1. Map to downstream schemaField URN
  //   2. Map input fields to upstream schemaField URNs
  //   3. Capture transformation type/subtype
  //   4. Extract SQL query from SQLJobFacet
  //   5. Build FineGrainedLineage objects
  return upstreamLineage;
}

Data Process Instance (L1193-1237)

private static DataProcessInstanceRunEvent processDataProcessInstanceResult(
    OpenLineage.RunEvent event) {
  DataProcessInstanceRunEvent dpire = new DataProcessInstanceRunEvent();
  switch (event.getEventType()) {
    case COMPLETE: dpire.setStatus(DataProcessRunStatus.COMPLETE);
                   result.setType(RunResultType.SUCCESS); break;
    case FAIL:
    case ABORT:    dpire.setStatus(DataProcessRunStatus.COMPLETE);
                   result.setType(RunResultType.FAILURE); break;
    case START:
    case RUNNING:  dpire.setStatus(DataProcessRunStatus.STARTED); break;
    case OTHER:    result.setType(RunResultType.$UNKNOWN); break;
  }
  return dpire;
}

I/O Contract

Aspect Details
Input OpenLineage.RunEvent (from OpenLineage Spark listener)
Output List<MetadataChangeProposal> emitted to REST API, Kafka, file, or S3
Buffering _datahubJobs list accumulates DatahubJob objects until coalescing
Timeout 10 seconds per MCP emission (DEFAULT_TIMEOUT_SEC)
Error Handling Individual MCP failures are logged but do not block remaining emissions
Thread Safety streaming flag uses AtomicBoolean

Generated Entity Types

Entity URN Pattern Aspects Generated
DataFlow urn:li:dataFlow:(orchestrator,flowName,namespace) DataFlowInfo, Ownership, GlobalTags, Domains, DataPlatformInstance
DataJob urn:li:dataJob:(flowUrn,jobName) DataJobInfo, DataJobInputOutput
Dataset urn:li:dataset:(platform,name,env) SchemaMetadata, UpstreamLineage (if configured)
DataProcessInstance urn:li:dataProcessInstance:runId Properties, Relationships, RunEvent

Usage Examples

Internal Usage (called by OpenLineage context factory)

// The emit method is called automatically by the OpenLineage framework
// when Spark lifecycle events occur. No direct invocation is needed.

// Internal flow:
// 1. OpenLineageSparkListener receives Spark event
// 2. ContextFactory creates OpenLineage.RunEvent
// 3. DatahubEventEmitter.emit(event) is called
// 4. Event is converted, buffered, and optionally emitted

// At application end:
// DatahubSparkListener.onApplicationEnd() calls emitter.emitCoalesced()

Emitter Type Resolution

// The emitter is resolved from SparkLineageConf at emission time:
private Optional<Emitter> getEmitter() {
  if (datahubConf.getDatahubEmitterConfig() instanceof RestDatahubEmitterConfig) {
    return Optional.of(new RestEmitter(config.getRestEmitterConfig()));
  } else if (datahubConf.getDatahubEmitterConfig() instanceof KafkaDatahubEmitterConfig) {
    return Optional.of(new KafkaEmitter(config.getKafkaEmitterConfig(), topic));
  } else if (datahubConf.getDatahubEmitterConfig() instanceof FileDatahubEmitterConfig) {
    return Optional.of(new FileEmitter(config.getFileEmitterConfig()));
  } else if (datahubConf.getDatahubEmitterConfig() instanceof S3DatahubEmitterConfig) {
    return Optional.of(new S3Emitter(config.getS3EmitterConfig()));
  }
}

Coalescing Merge Strategy

The coalescing process merges multiple DatahubJob objects with the following rules:

Field Merge Strategy
Job URN Last job's URN (derived from flow URN)
Flow URN Last job's flow URN
Job Info First non-null job info; custom properties are merged (first value wins on conflict)
Data Flow Info First non-null; custom properties merged with app context metadata
Input Datasets Union; if same URN exists, update schema and lineage
Output Datasets Union; if same URN exists, update schema and lineage
Start Time Minimum across all jobs
End Time Maximum across all jobs
Process Instance URN First job's instance URN (stable identity)
Process Instance Status FAILURE overrides SUCCESS
Tags Applied from SparkLineageConf after coalescing
Domains Applied from SparkLineageConf after coalescing

Knowledge Sources

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment