Implementation:Datahub project Datahub Emitter Close
| Property | Value |
|---|---|
| Implementation Name | Emitter_Close |
| Type | API Doc |
| Category | Java_SDK_Metadata_Emission |
| Workflow | Java_SDK_Metadata_Emission |
| Repository | https://github.com/datahub-project/datahub |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
Emitter_Close documents the close() method on the datahub.client.Emitter interface, which extends java.io.Closeable. Calling close() releases all transport resources held by the emitter and, for certain backends, triggers finalization operations such as writing closing JSON delimiters or uploading files to cloud storage. Each emitter implementation performs backend-specific cleanup:
- RestEmitter -- Closes the Apache
CloseableHttpAsyncClient, shutting down the connection pool and IO reactor threads. - KafkaEmitter -- Closes the
KafkaProducer, flushing any buffered records and releasing all associated threads and network sockets. - FileEmitter -- Writes a newline and the closing
]bracket to produce a syntactically valid JSON array, then closes theBufferedWriterand sets the internal closed flag. - S3Emitter -- Closes the internal
FileEmitter(which writes the closing bracket), then constructs aPutObjectRequestand uploads the temporary file to the configured S3 bucket using the AWS SDKS3Client. After successful upload, the temporary file is deleted from the local filesystem. If the upload fails, anIOExceptionis thrown.
Usage
Always call close() after all emit() calls have been made. The recommended pattern is Java's try-with-resources, which guarantees close() is called even if an exception occurs.
Code Reference
Source Location
| Property | Value |
|---|---|
| Interface | metadata-integration/java/datahub-client/src/main/java/datahub/client/Emitter.java
|
| Lines | L21 (interface extends Closeable)
|
| RestEmitter.close() | metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java (L340-342)
|
| KafkaEmitter.close() | metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java (L72-74)
|
| FileEmitter.close() | metadata-integration/java/datahub-client/src/main/java/datahub/client/file/FileEmitter.java (L114-120)
|
| S3Emitter.close() | metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3Emitter.java (L100-134)
|
| Repository | https://github.com/datahub-project/datahub |
Signature
Interface declaration:
@ThreadSafe
public interface Emitter extends Closeable {
// Inherited from Closeable:
void close() throws IOException;
}
RestEmitter implementation:
@Override
public void close() throws IOException {
this.httpClient.close();
}
KafkaEmitter implementation:
@Override
public void close() throws IOException {
producer.close();
}
FileEmitter implementation:
@Override
public void close() throws IOException {
this.writer.newLine();
this.writer.append("]");
this.writer.close();
this.closed.set(true);
}
S3Emitter implementation (summarized):
@Override
public void close() throws IOException {
fileEmitter.close(); // writes closing ] bracket
String key = /* compute S3 key from config */;
PutObjectRequest objectRequest = PutObjectRequest.builder()
.bucket(config.getBucketName())
.key(key)
.build();
PutObjectResponse response = client.putObject(objectRequest, this.temporaryFile);
deleteTemporaryFile();
if (!response.sdkHttpResponse().isSuccessful()) {
throw new IOException("Failed to upload file to S3. Response: " + response);
}
}
Import
import datahub.client.Emitter;
import java.io.Closeable;
import java.io.IOException;
I/O Contract
| Direction | Description |
|---|---|
| Input | None (no parameters) |
| Output | void
|
| REST side effects | Shuts down the CloseableHttpAsyncClient, releasing connection pool and IO reactor threads. Pending requests may be cancelled.
|
| Kafka side effects | Closes the KafkaProducer, flushing buffered records and releasing threads and sockets.
|
| File side effects | Writes newline + ] to produce valid JSON array, closes BufferedWriter, sets closed flag to true.
|
| S3 side effects | Closes internal FileEmitter (writes bracket), uploads temporary file to S3 via PutObjectRequest, deletes temporary file on success. Throws IOException on upload failure.
|
| Exceptions | IOException if the underlying resource cannot be closed or (S3) if the upload fails.
|
Usage Examples
Example 1: Try-with-resources (recommended)
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import com.linkedin.dataset.DatasetProperties;
try (RestEmitter emitter = RestEmitter.createWithDefaults()) {
MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,project.dataset.table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("User table"))
.build();
emitter.emit(mcpw, null).get();
} // emitter.close() called automatically here
Example 2: Explicit close in finally block
import datahub.client.rest.RestEmitter;
RestEmitter emitter = RestEmitter.createWithDefaults();
try {
emitter.emit(mcpw1, null).get();
emitter.emit(mcpw2, null).get();
} finally {
emitter.close();
}
Example 3: FileEmitter close writes valid JSON
import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;
FileEmitter emitter = new FileEmitter(
FileEmitterConfig.builder().fileName("/tmp/metadata.json").build()
);
emitter.emit(mcpw1, null);
emitter.emit(mcpw2, null);
emitter.close(); // writes "]" -- file now contains valid JSON array
// File contents: [ {mcp1}, {mcp2} ]
Example 4: S3Emitter close triggers upload
import datahub.client.s3.S3Emitter;
import datahub.client.s3.S3EmitterConfig;
try (S3Emitter emitter = new S3Emitter(
S3EmitterConfig.builder()
.bucketName("my-bucket")
.pathPrefix("datahub/mcps")
.region("us-east-1")
.build())) {
emitter.emit(mcpw1, null);
emitter.emit(mcpw2, null);
} // close() writes JSON bracket, uploads to S3, deletes temp file