Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave BatchAppendOnlyJDBCSink

From Leeroopedia


Knowledge Sources
Domains Connectors, JDBC, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Optimized JDBC sink for append-only workloads that batches INSERT statements for efficient bulk writes to external databases.

Description

BatchAppendOnlyJDBCSink implements the SinkWriter interface to deliver rows from RisingWave to JDBC-compatible databases using batched INSERT operations. It only accepts INSERT operations (append-only mode) and rejects any other operation types with a FAILED_PRECONDITION error.

The class manages a JDBC connection, resolves the target table's column type mapping via database metadata, and delegates SQL dialect specifics (quoting, parameter binding, insert statement generation) to a JdbcDialect obtained from JdbcUtils.getDialectFactory(). It includes a workaround for a known pgjdbc issue where TIMESTAMP WITH TIME ZONE columns are incorrectly reported as plain TIMESTAMP.

An inner class JdbcStatements wraps the lifecycle of PreparedStatement objects, supporting configurable batch sizes (batchInsertRows) and query timeouts (queryTimeout). When the batch threshold is reached during write(), the accumulated statements are executed automatically. On barrier(), any remaining buffered statements are flushed and committed, and a mock SinkMetadata is returned. Special handling is included for Snowflake connections, where USE DATABASE and USE SCHEMA commands are executed during initialization.

Usage

This sink is instantiated by the JDBC sink factory when a user creates an append-only JDBC sink. It is used for streaming data from RisingWave materialized views to external relational databases such as PostgreSQL, MySQL, Snowflake, and others that support JDBC.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/BatchAppendOnlyJDBCSink.java (L32-347)

Signature

public class BatchAppendOnlyJDBCSink implements SinkWriter {
    public BatchAppendOnlyJDBCSink(JDBCSinkConfig config, TableSchema tableSchema);

    @Override
    public boolean write(Iterable<SinkRow> rows);

    @Override
    public void beginEpoch(long epoch);

    @Override
    public Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint);

    @Override
    public void drop();

    public String getTableName();
    public Connection getConn();
}

Import

import com.risingwave.connector.BatchAppendOnlyJDBCSink;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.jdbc.JdbcDialect;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;

I/O Contract

Inputs

Name Type Required Description
config JDBCSinkConfig Yes JDBC connection URL, table name, schema name, query timeout, and batch insert size
tableSchema TableSchema Yes Column names, types, and primary key definitions from the upstream relation
rows Iterable<SinkRow> Yes (for write) Batch of rows to insert; each row must have Op == INSERT
isCheckpoint boolean Yes (for barrier) Whether this barrier is a checkpoint boundary

Outputs

Name Type Description
write result boolean Returns true on successful batch insertion
SinkMetadata Optional<SinkMetadata> Returns a serialized metadata object with empty bytes on barrier
Side effect JDBC INSERT Rows are written to the target JDBC table via batched PreparedStatement execution

Usage Examples

Creating an Append-Only JDBC Sink via SQL

CREATE SINK my_jdbc_sink FROM my_materialized_view
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://host:5432/mydb',
    table.name = 'target_table',
    schema.name = 'public',
    type = 'append-only',
    force_append_only = 'true'
);

Internal Instantiation

JDBCSinkConfig config = new JDBCSinkConfig(tableProperties);
TableSchema tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
SinkWriter writer = new BatchAppendOnlyJDBCSink(config, tableSchema);

// Write a batch of rows
writer.write(rows);

// Flush on barrier
Optional<ConnectorServiceProto.SinkMetadata> metadata = writer.barrier(true);

// Clean up
writer.drop();

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment