Implementation:Risingwavelabs Risingwave JDBCSinkFactory
| Property | Value |
|---|---|
| Component | risingwave-sink-jdbc |
| Language | Java |
| Package | com.risingwave.connector |
| Implements | SinkFactory |
| Lines | 142 |
| Source | JDBCSinkFactory.java |
Overview
JDBCSinkFactory is the factory class responsible for creating and validating JDBC sink writer instances. It implements the SinkFactory interface, which is the entry point used by the RisingWave connector framework to instantiate sinks. The factory performs two primary functions:
- Writer creation (
createWriter): Selects the appropriate sink implementation based on the JDBC URL --BatchAppendOnlyJDBCSinkfor Snowflake and Redshift targets, or the general-purposeJDBCSinkfor all other databases. - Validation (
validate): Connects to the target database to verify that the table exists, all columns in the RisingWave schema are present in the JDBC table, and for upsert sinks, that the primary key constraints match.
The factory also handles configuration polymorphism, routing Snowflake JDBC URLs to SnowflakeJDBCSinkConfig for specialized authentication support.
Code Reference
Source Location
java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java
Signature
public class JDBCSinkFactory implements SinkFactory {
public static final String JDBC_URL_PROP = "jdbc.url";
public static final String TABLE_NAME_PROP = "table.name";
private JDBCSinkConfig createConfig(ObjectMapper mapper, Map<String, String> tableProperties);
@Override
public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties);
@Override
public void validate(TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType);
}
Imports
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.sql.*;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
I/O Contract
Input
Both createWriter and validate receive:
- tableSchema (
TableSchema): The RisingWave table schema describing columns and primary keys. - tableProperties (
Map<String, String>): Key-value properties from the SQLWITHclause, includingjdbc.url,table.name,type, credentials, and dialect-specific settings.
The validate method additionally receives:
- sinkType (
SinkType): The protobuf-defined sink type (e.g.,SINK_TYPE_UPSERTorSINK_TYPE_APPEND_ONLY).
Output
- createWriter: Returns a
SinkWriterinstance -- eitherBatchAppendOnlyJDBCSink(for Snowflake/Redshift) orJDBCSink(for all other databases). - validate: Returns void on success. Throws gRPC
Status.INVALID_ARGUMENTorStatus.FAILED_PRECONDITIONruntime exceptions on validation failures.
Validation Checks
The validate method performs the following checks:
- Connection test: Establishes a JDBC connection to verify connectivity.
- Table existence: Queries database metadata to confirm the target table exists.
- Column matching: Verifies all columns from the RisingWave schema exist in the JDBC table.
- Primary key matching (upsert only): Confirms the JDBC table has a primary key and it matches the
primary_keyoption from theWITHclause.
Usage Examples
Creating a Sink Writer
JDBCSinkFactory factory = new JDBCSinkFactory();
SinkWriter writer = factory.createWriter(tableSchema, tableProperties);
Validating Sink Configuration
JDBCSinkFactory factory = new JDBCSinkFactory();
// Throws on failure
factory.validate(tableSchema, tableProperties, SinkType.SINK_TYPE_UPSERT);
Config Routing Logic
// Internal: routes to SnowflakeJDBCSinkConfig for Snowflake URLs
private JDBCSinkConfig createConfig(ObjectMapper mapper, Map<String, String> tableProperties) {
String jdbcUrl = tableProperties.get(JDBC_URL_PROP);
if (jdbcUrl != null && jdbcUrl.startsWith("jdbc:snowflake")) {
return mapper.convertValue(tableProperties, SnowflakeJDBCSinkConfig.class);
}
return mapper.convertValue(tableProperties, JDBCSinkConfig.class);
}
Related Pages
- Risingwavelabs_Risingwave_JDBCSinkConfig -- Configuration class deserialized by this factory
- Risingwavelabs_Risingwave_SnowflakeJDBCSinkConfig -- Snowflake-specific configuration subclass selected by URL routing
- Risingwavelabs_Risingwave_JdbcUtils -- Utility class for dialect resolution and connection creation
- Risingwavelabs_Risingwave_JdbcDialect_Interface -- Dialect interface used by the sink writers this factory creates
- Risingwavelabs_Risingwave_BatchAppendOnlyJDBCSink -- Sink writer created for Snowflake and Redshift targets