Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave JDBCSinkFactory

From Leeroopedia


Property Value
Component risingwave-sink-jdbc
Language Java
Package com.risingwave.connector
Implements SinkFactory
Lines 142
Source JDBCSinkFactory.java

Overview

JDBCSinkFactory is the factory class responsible for creating and validating JDBC sink writer instances. It implements the SinkFactory interface, which is the entry point used by the RisingWave connector framework to instantiate sinks. The factory performs two primary functions:

  1. Writer creation (createWriter): Selects the appropriate sink implementation based on the JDBC URL -- BatchAppendOnlyJDBCSink for Snowflake and Redshift targets, or the general-purpose JDBCSink for all other databases.
  2. Validation (validate): Connects to the target database to verify that the table exists, all columns in the RisingWave schema are present in the JDBC table, and for upsert sinks, that the primary key constraints match.

The factory also handles configuration polymorphism, routing Snowflake JDBC URLs to SnowflakeJDBCSinkConfig for specialized authentication support.

Code Reference

Source Location

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java

Signature

public class JDBCSinkFactory implements SinkFactory {
    public static final String JDBC_URL_PROP = "jdbc.url";
    public static final String TABLE_NAME_PROP = "table.name";

    private JDBCSinkConfig createConfig(ObjectMapper mapper, Map<String, String> tableProperties);

    @Override
    public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties);

    @Override
    public void validate(TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType);
}

Imports

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.sql.*;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

I/O Contract

Input

Both createWriter and validate receive:

  • tableSchema (TableSchema): The RisingWave table schema describing columns and primary keys.
  • tableProperties (Map<String, String>): Key-value properties from the SQL WITH clause, including jdbc.url, table.name, type, credentials, and dialect-specific settings.

The validate method additionally receives:

  • sinkType (SinkType): The protobuf-defined sink type (e.g., SINK_TYPE_UPSERT or SINK_TYPE_APPEND_ONLY).

Output

  • createWriter: Returns a SinkWriter instance -- either BatchAppendOnlyJDBCSink (for Snowflake/Redshift) or JDBCSink (for all other databases).
  • validate: Returns void on success. Throws gRPC Status.INVALID_ARGUMENT or Status.FAILED_PRECONDITION runtime exceptions on validation failures.

Validation Checks

The validate method performs the following checks:

  1. Connection test: Establishes a JDBC connection to verify connectivity.
  2. Table existence: Queries database metadata to confirm the target table exists.
  3. Column matching: Verifies all columns from the RisingWave schema exist in the JDBC table.
  4. Primary key matching (upsert only): Confirms the JDBC table has a primary key and it matches the primary_key option from the WITH clause.

Usage Examples

Creating a Sink Writer

JDBCSinkFactory factory = new JDBCSinkFactory();
SinkWriter writer = factory.createWriter(tableSchema, tableProperties);

Validating Sink Configuration

JDBCSinkFactory factory = new JDBCSinkFactory();
// Throws on failure
factory.validate(tableSchema, tableProperties, SinkType.SINK_TYPE_UPSERT);

Config Routing Logic

// Internal: routes to SnowflakeJDBCSinkConfig for Snowflake URLs
private JDBCSinkConfig createConfig(ObjectMapper mapper, Map<String, String> tableProperties) {
    String jdbcUrl = tableProperties.get(JDBC_URL_PROP);
    if (jdbcUrl != null && jdbcUrl.startsWith("jdbc:snowflake")) {
        return mapper.convertValue(tableProperties, SnowflakeJDBCSinkConfig.class);
    }
    return mapper.convertValue(tableProperties, JDBCSinkConfig.class);
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment