Implementation:Risingwavelabs Risingwave BatchAppendOnlyJDBCSink
| Knowledge Sources | |
|---|---|
| Domains | Connectors, JDBC, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Optimized JDBC sink for append-only workloads that batches INSERT statements for efficient bulk writes to external databases.
Description
BatchAppendOnlyJDBCSink implements the SinkWriter interface to deliver rows from RisingWave to JDBC-compatible databases using batched INSERT operations. It only accepts INSERT operations (append-only mode) and rejects any other operation types with a FAILED_PRECONDITION error.
The class manages a JDBC connection, resolves the target table's column type mapping via database metadata, and delegates SQL dialect specifics (quoting, parameter binding, insert statement generation) to a JdbcDialect obtained from JdbcUtils.getDialectFactory(). It includes a workaround for a known pgjdbc issue where TIMESTAMP WITH TIME ZONE columns are incorrectly reported as plain TIMESTAMP.
An inner class JdbcStatements wraps the lifecycle of PreparedStatement objects, supporting configurable batch sizes (batchInsertRows) and query timeouts (queryTimeout). When the batch threshold is reached during write(), the accumulated statements are executed automatically. On barrier(), any remaining buffered statements are flushed and committed, and a mock SinkMetadata is returned. Special handling is included for Snowflake connections, where USE DATABASE and USE SCHEMA commands are executed during initialization.
Usage
This sink is instantiated by the JDBC sink factory when a user creates an append-only JDBC sink. It is used for streaming data from RisingWave materialized views to external relational databases such as PostgreSQL, MySQL, Snowflake, and others that support JDBC.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/BatchAppendOnlyJDBCSink.java (L32-347)
Signature
public class BatchAppendOnlyJDBCSink implements SinkWriter {
public BatchAppendOnlyJDBCSink(JDBCSinkConfig config, TableSchema tableSchema);
@Override
public boolean write(Iterable<SinkRow> rows);
@Override
public void beginEpoch(long epoch);
@Override
public Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint);
@Override
public void drop();
public String getTableName();
public Connection getConn();
}
Import
import com.risingwave.connector.BatchAppendOnlyJDBCSink;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.jdbc.JdbcDialect;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | JDBCSinkConfig | Yes | JDBC connection URL, table name, schema name, query timeout, and batch insert size |
| tableSchema | TableSchema | Yes | Column names, types, and primary key definitions from the upstream relation |
| rows | Iterable<SinkRow> | Yes (for write) | Batch of rows to insert; each row must have Op == INSERT |
| isCheckpoint | boolean | Yes (for barrier) | Whether this barrier is a checkpoint boundary |
Outputs
| Name | Type | Description |
|---|---|---|
| write result | boolean | Returns true on successful batch insertion |
| SinkMetadata | Optional<SinkMetadata> | Returns a serialized metadata object with empty bytes on barrier |
| Side effect | JDBC INSERT | Rows are written to the target JDBC table via batched PreparedStatement execution |
Usage Examples
Creating an Append-Only JDBC Sink via SQL
CREATE SINK my_jdbc_sink FROM my_materialized_view
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://host:5432/mydb',
table.name = 'target_table',
schema.name = 'public',
type = 'append-only',
force_append_only = 'true'
);
Internal Instantiation
JDBCSinkConfig config = new JDBCSinkConfig(tableProperties);
TableSchema tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
SinkWriter writer = new BatchAppendOnlyJDBCSink(config, tableSchema);
// Write a batch of rows
writer.write(rows);
// Flush on barrier
Optional<ConnectorServiceProto.SinkMetadata> metadata = writer.barrier(true);
// Clean up
writer.drop();