Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave OpensearchBulkProcessorAdapter

From Leeroopedia


Property Value
File java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
Language Java
Lines 104
Category Adapter
Package com.risingwave.connector

Overview

OpensearchBulkProcessorAdapter is the OpenSearch-specific implementation of the BulkProcessorAdapter interface. It wraps the OpenSearch BulkProcessor API to translate generic bulk add/delete operations into OpenSearch-native UpdateRequest and DeleteRequest calls. The adapter mirrors the structure of ElasticBulkProcessorAdapter but uses OpenSearch client libraries. It configures batch size limits, flush intervals, concurrent request settings, and exponential backoff retry policies from EsSinkConfig.

Code Reference

Source Location

java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java

Signature

public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
    public OpensearchBulkProcessorAdapter(
            RequestTracker requestTracker,
            OpensearchRestHighLevelClientAdapter client,
            EsSinkConfig config);
    public void addRow(String index, String key, String doc, String routing) throws InterruptedException;
    public void deleteRow(String index, String key, String routing) throws InterruptedException;
    public void flush();
    public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
}

Imports

import com.risingwave.connector.EsSink.RequestTracker;
import java.util.concurrent.TimeUnit;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

I/O Contract

Constructor

Initializes the OpenSearch BulkProcessor with the following configuration from EsSinkConfig:

Setting Config Method Description
Bulk actions getBatchNumMessages() Number of actions before automatic flush
Bulk size getBatchSizeKb() Size in KB before automatic flush
Flush interval hardcoded 5 seconds
Concurrent requests getConcurrentRequests() Number of concurrent bulk requests
Backoff policy hardcoded Exponential backoff starting at 100ms, up to 3 retries
Retry on conflict getRetryOnConflict() Number of retries on version conflicts

addRow

Creates an OpenSearch UpdateRequest with docAsUpsert(true). Unlike the Elasticsearch adapter, the OpenSearch UpdateRequest constructor takes only index and key (no document type parameter). Optional routing is applied if provided.

deleteRow

Creates an OpenSearch DeleteRequest targeting the specified index and key. Unlike the Elasticsearch adapter, no document type parameter is used. Optional routing is applied if provided.

Both addRow and deleteRow call requestTracker.addWriteTask() before adding the request to the bulk processor.

Usage Examples

// Create the adapter with an OpenSearch client
RequestTracker tracker = new EsSink.RequestTracker();
OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config);
OpensearchBulkProcessorAdapter adapter = new OpensearchBulkProcessorAdapter(tracker, client, config);

// Upsert a document
adapter.addRow("my_index", "key1", "{\"name\": \"value\"}", null);

// Delete a document with routing
adapter.deleteRow("my_index", "key2", "routing_value");

// Flush and close
adapter.flush();
adapter.awaitClose(30, TimeUnit.SECONDS);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment