Implementation:Risingwavelabs Risingwave OpensearchBulkProcessorAdapter
| Property | Value |
|---|---|
| File | java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
|
| Language | Java |
| Lines | 104 |
| Category | Adapter |
| Package | com.risingwave.connector
|
Overview
OpensearchBulkProcessorAdapter is the OpenSearch-specific implementation of the BulkProcessorAdapter interface. It wraps the OpenSearch BulkProcessor API to translate generic bulk add/delete operations into OpenSearch-native UpdateRequest and DeleteRequest calls. The adapter mirrors the structure of ElasticBulkProcessorAdapter but uses OpenSearch client libraries. It configures batch size limits, flush intervals, concurrent request settings, and exponential backoff retry policies from EsSinkConfig.
Code Reference
Source Location
Signature
public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker,
OpensearchRestHighLevelClientAdapter client,
EsSinkConfig config);
public void addRow(String index, String key, String doc, String routing) throws InterruptedException;
public void deleteRow(String index, String key, String routing) throws InterruptedException;
public void flush();
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
}
Imports
import com.risingwave.connector.EsSink.RequestTracker;
import java.util.concurrent.TimeUnit;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
I/O Contract
Constructor
Initializes the OpenSearch BulkProcessor with the following configuration from EsSinkConfig:
| Setting | Config Method | Description |
|---|---|---|
| Bulk actions | getBatchNumMessages() |
Number of actions before automatic flush |
| Bulk size | getBatchSizeKb() |
Size in KB before automatic flush |
| Flush interval | hardcoded | 5 seconds |
| Concurrent requests | getConcurrentRequests() |
Number of concurrent bulk requests |
| Backoff policy | hardcoded | Exponential backoff starting at 100ms, up to 3 retries |
| Retry on conflict | getRetryOnConflict() |
Number of retries on version conflicts |
addRow
Creates an OpenSearch UpdateRequest with docAsUpsert(true). Unlike the Elasticsearch adapter, the OpenSearch UpdateRequest constructor takes only index and key (no document type parameter). Optional routing is applied if provided.
deleteRow
Creates an OpenSearch DeleteRequest targeting the specified index and key. Unlike the Elasticsearch adapter, no document type parameter is used. Optional routing is applied if provided.
Both addRow and deleteRow call requestTracker.addWriteTask() before adding the request to the bulk processor.
Usage Examples
// Create the adapter with an OpenSearch client
RequestTracker tracker = new EsSink.RequestTracker();
OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config);
OpensearchBulkProcessorAdapter adapter = new OpensearchBulkProcessorAdapter(tracker, client, config);
// Upsert a document
adapter.addRow("my_index", "key1", "{\"name\": \"value\"}", null);
// Delete a document with routing
adapter.deleteRow("my_index", "key2", "routing_value");
// Flush and close
adapter.flush();
adapter.awaitClose(30, TimeUnit.SECONDS);
Related Pages
- BulkProcessorAdapter - Interface implemented by this class
- OpensearchRestHighLevelClientAdapter - Client adapter used for async bulk operations
- BulkListener - Listener handling bulk operation callbacks
- EsSinkConfig - Configuration driving bulk processor settings
- ElasticBulkProcessorAdapter - Elasticsearch counterpart of this adapter