Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Emitter Close

From Leeroopedia


Property Value
Implementation Name Emitter_Close
Type API Doc
Category Java_SDK_Metadata_Emission
Workflow Java_SDK_Metadata_Emission
Repository https://github.com/datahub-project/datahub
Last Updated 2026-02-09 17:00 GMT

Overview

Description

Emitter_Close documents the close() method on the datahub.client.Emitter interface, which extends java.io.Closeable. Calling close() releases all transport resources held by the emitter and, for certain backends, triggers finalization operations such as writing closing JSON delimiters or uploading files to cloud storage. Each emitter implementation performs backend-specific cleanup:

  • RestEmitter -- Closes the Apache CloseableHttpAsyncClient, shutting down the connection pool and IO reactor threads.
  • KafkaEmitter -- Closes the KafkaProducer, flushing any buffered records and releasing all associated threads and network sockets.
  • FileEmitter -- Writes a newline and the closing ] bracket to produce a syntactically valid JSON array, then closes the BufferedWriter and sets the internal closed flag.
  • S3Emitter -- Closes the internal FileEmitter (which writes the closing bracket), then constructs a PutObjectRequest and uploads the temporary file to the configured S3 bucket using the AWS SDK S3Client. After successful upload, the temporary file is deleted from the local filesystem. If the upload fails, an IOException is thrown.

Usage

Always call close() after all emit() calls have been made. The recommended pattern is Java's try-with-resources, which guarantees close() is called even if an exception occurs.

Code Reference

Source Location

Property Value
Interface metadata-integration/java/datahub-client/src/main/java/datahub/client/Emitter.java
Lines L21 (interface extends Closeable)
RestEmitter.close() metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java (L340-342)
KafkaEmitter.close() metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java (L72-74)
FileEmitter.close() metadata-integration/java/datahub-client/src/main/java/datahub/client/file/FileEmitter.java (L114-120)
S3Emitter.close() metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3Emitter.java (L100-134)
Repository https://github.com/datahub-project/datahub

Signature

Interface declaration:

@ThreadSafe
public interface Emitter extends Closeable {
    // Inherited from Closeable:
    void close() throws IOException;
}

RestEmitter implementation:

@Override
public void close() throws IOException {
    this.httpClient.close();
}

KafkaEmitter implementation:

@Override
public void close() throws IOException {
    producer.close();
}

FileEmitter implementation:

@Override
public void close() throws IOException {
    this.writer.newLine();
    this.writer.append("]");
    this.writer.close();
    this.closed.set(true);
}

S3Emitter implementation (summarized):

@Override
public void close() throws IOException {
    fileEmitter.close();  // writes closing ] bracket
    String key = /* compute S3 key from config */;
    PutObjectRequest objectRequest = PutObjectRequest.builder()
        .bucket(config.getBucketName())
        .key(key)
        .build();
    PutObjectResponse response = client.putObject(objectRequest, this.temporaryFile);
    deleteTemporaryFile();
    if (!response.sdkHttpResponse().isSuccessful()) {
        throw new IOException("Failed to upload file to S3. Response: " + response);
    }
}

Import

import datahub.client.Emitter;
import java.io.Closeable;
import java.io.IOException;

I/O Contract

Direction Description
Input None (no parameters)
Output void
REST side effects Shuts down the CloseableHttpAsyncClient, releasing connection pool and IO reactor threads. Pending requests may be cancelled.
Kafka side effects Closes the KafkaProducer, flushing buffered records and releasing threads and sockets.
File side effects Writes newline + ] to produce valid JSON array, closes BufferedWriter, sets closed flag to true.
S3 side effects Closes internal FileEmitter (writes bracket), uploads temporary file to S3 via PutObjectRequest, deletes temporary file on success. Throws IOException on upload failure.
Exceptions IOException if the underlying resource cannot be closed or (S3) if the upload fails.

Usage Examples

Example 1: Try-with-resources (recommended)

import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import com.linkedin.dataset.DatasetProperties;

try (RestEmitter emitter = RestEmitter.createWithDefaults()) {
    MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
        .entityType("dataset")
        .entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,project.dataset.table,PROD)")
        .upsert()
        .aspect(new DatasetProperties().setDescription("User table"))
        .build();

    emitter.emit(mcpw, null).get();
} // emitter.close() called automatically here

Example 2: Explicit close in finally block

import datahub.client.rest.RestEmitter;

RestEmitter emitter = RestEmitter.createWithDefaults();
try {
    emitter.emit(mcpw1, null).get();
    emitter.emit(mcpw2, null).get();
} finally {
    emitter.close();
}

Example 3: FileEmitter close writes valid JSON

import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;

FileEmitter emitter = new FileEmitter(
    FileEmitterConfig.builder().fileName("/tmp/metadata.json").build()
);

emitter.emit(mcpw1, null);
emitter.emit(mcpw2, null);
emitter.close(); // writes "]" -- file now contains valid JSON array
// File contents: [ {mcp1}, {mcp2} ]

Example 4: S3Emitter close triggers upload

import datahub.client.s3.S3Emitter;
import datahub.client.s3.S3EmitterConfig;

try (S3Emitter emitter = new S3Emitter(
        S3EmitterConfig.builder()
            .bucketName("my-bucket")
            .pathPrefix("datahub/mcps")
            .region("us-east-1")
            .build())) {
    emitter.emit(mcpw1, null);
    emitter.emit(mcpw2, null);
} // close() writes JSON bracket, uploads to S3, deletes temp file

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment