Implementation:Datahub project Datahub EntityClient Upsert
| Field | Value |
|---|---|
| Implementation Name | EntityClient Upsert |
| Type | API Doc |
| Status | Active |
| Last Updated | 2026-02-10 |
| Repository | Datahub_project_Datahub |
| Source File | metadata-integration/java/datahub-client/src/main/java/datahub/client/v2/operations/EntityClient.java (Lines 82-195)
|
Overview
The upsert() method on EntityClient is the primary mechanism for persisting entity metadata to DataHub. It binds the entity to the client, emits all cached aspects and pending MCPs as full upserts, waits for their completion, then emits accumulated patches with version-aware transformation and retry logic.
Source Reference
File: metadata-integration/java/datahub-client/src/main/java/datahub/client/v2/operations/EntityClient.java (Lines 82-195)
public <T extends Entity> void upsert(@Nonnull T entity)
throws IOException, ExecutionException, InterruptedException {
// Step 1: Bind entity to this client (for lazy loading)
entity.bindToClient(this, config.getMode());
List<Future<MetadataWriteResponse>> futures = new ArrayList<>();
// Step 2: Emit cached aspects (from builder) - full aspects for initial creation
List<MetadataChangeProposalWrapper> mcps = entity.toMCPs();
if (!mcps.isEmpty()) {
for (MetadataChangeProposalWrapper mcp : mcps) {
futures.add(emitter.emit(mcp));
}
}
// Step 3: Emit pending full aspect MCPs (from set*() methods)
if (entity.hasPendingMCPs()) {
List<MetadataChangeProposalWrapper> pendingMCPs = entity.getPendingMCPs();
for (MetadataChangeProposalWrapper mcp : pendingMCPs) {
futures.add(emitter.emit(mcp));
}
entity.clearPendingMCPs();
}
// Step 4: Wait for all full aspect writes to complete before emitting patches
if (!futures.isEmpty()) {
for (int i = 0; i < futures.size(); i++) {
MetadataWriteResponse response = futures.get(i).get();
if (!response.isSuccess()) {
throw new IOException("Failed to emit full aspect write: "
+ response.getResponseContent());
}
}
}
// Step 5: Emit all pending patches with version-aware transformation
if (entity.hasPendingPatches()) {
List<MetadataChangeProposal> allPatches = entity.getPendingPatches();
ServerConfig serverConfig = client.getServerConfig();
List<TransformResult> transformResults =
patchTransformer.transform(allPatches, serverConfig);
for (TransformResult result : transformResults) {
emitWithRetry(result);
}
entity.clearPatchBuilders();
entity.clearPendingPatches();
}
// Step 6: Clear dirty flag
entity.clearDirty();
}
Method Signature
public <T extends Entity> void upsert(@Nonnull T entity)
throws IOException, ExecutionException, InterruptedException
Accessed via:
client.entities().upsert(entity);
I/O Contract
Input:
entity-- An Entity subclass instance (e.g.,Dataset,Chart,Dashboard) with zero or more pending mutations:- Cached aspects from builder (dirty aspects in
WriteTrackingAspectCache) - Pending full aspect MCPs from
set*()methods - Accumulated patch builders from
add*()/remove*()methods - Legacy pending patch MCPs
- Cached aspects from builder (dirty aspects in
Output:
void-- no return value on success- All pending mutations are emitted to the DataHub server
- Entity's dirty flag is cleared
- Patch builders and pending MCPs/patches are cleared
Exceptions:
IOException-- if any MCP emission fails, including after retry exhaustionExecutionException-- if a future for an emitted MCP failsInterruptedException-- if the thread is interrupted while waiting for emission
Execution Flow
The upsert method follows a strict ordering to prevent consistency issues:
| Step | Operation | Source | Change Type | Notes |
|---|---|---|---|---|
| 1 | Bind entity to client | -- | -- | Sets operation mode (SDK/INGESTION) and client reference for lazy loading |
| 2 | Emit cached aspects | entity.toMCPs() |
UPSERT | Aspects from builder (e.g., DatasetProperties set during construction) |
| 3 | Emit pending MCPs | entity.getPendingMCPs() |
UPSERT | Full aspect replacements from setTags(), setOwners(), etc.
|
| 4 | Wait for completion | -- | -- | Blocks until all full aspect writes succeed; fails fast on error |
| 5 | Emit patches | entity.getPendingPatches() |
PATCH (transformed) | Accumulated patches from addTag(), addOwner(), etc. Passes through VersionAwarePatchTransformer
|
| 6 | Clear dirty flag | entity.clearDirty() |
-- | Marks entity as clean (no pending mutations) |
Retry Logic
The emitWithRetry() method (Lines 628-686) handles version conflict retries:
private void emitWithRetry(@Nonnull TransformResult result)
throws IOException, ExecutionException, InterruptedException {
int maxRetries = 3;
long[] backoffDelays = {100L, 200L, 400L};
for (int attempt = 0; attempt <= maxRetries; attempt++) {
Future<MetadataWriteResponse> future = emitter.emit(mcp, null);
MetadataWriteResponse response = future.get();
if (response.isSuccess()) { return; }
VersionConflictInfo conflict = parseVersionConflict(response, mcp);
if (conflict != null && result.hasRetryFunction() && attempt < maxRetries) {
Thread.sleep(backoffDelays[attempt]);
mcp = result.getRetryFunction().apply(conflict);
} else {
throw new IOException("Failed to emit MCP: " + response.getResponseContent());
}
}
}
Retry parameters:
- Maximum retries: 3
- Backoff delays: 100ms, 200ms, 400ms (exponential)
- Conflict detection: HTTP 422 with version mismatch pattern
Usage Examples
Create New Entity
Dataset dataset = Dataset.builder()
.platform("snowflake")
.name("my_table")
.description("Customer data")
.build();
dataset.addTag("pii");
dataset.addOwner("urn:li:corpuser:johndoe", OwnershipType.DATA_OWNER);
client.entities().upsert(dataset);
Update Existing Entity
Dataset dataset = client.entities().get(urn, Dataset.class);
Dataset mutable = dataset.mutable();
mutable.addTag("verified");
mutable.setDescription("Updated description");
client.entities().upsert(mutable);
Related
- Implements: Datahub_project_Datahub_Entity_Upsert
- Depends on: Datahub_project_Datahub_Entity_Metadata_Mutations
- Depends on: Datahub_project_Datahub_DataHubClientV2_Builder
- Related Implementation: Datahub_project_Datahub_EntityClient_Get_Mutable
- Environment: Environment:Datahub_project_Datahub_Java_17_Backend_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Validation_Across_All_APIs