Implementation:Risingwavelabs Risingwave JdbcUtils
Appearance
| Property | Value |
|---|---|
| Component | risingwave-sink-jdbc |
| Language | Java |
| Package | com.risingwave.connector |
| Type | Abstract utility class |
| Lines | 118 |
| Source | JdbcUtils.java |
Overview
JdbcUtils is an abstract utility class that serves as the central hub for JDBC connection management and dialect resolution in the RisingWave JDBC sink connector. It provides three key capabilities:
- Dialect factory resolution (
getDialectFactory): Maps JDBC URL prefixes to the appropriateJdbcDialectFactoryimplementation, supporting MySQL/MariaDB, PostgreSQL, Redshift, Snowflake, and SQL Server. - Base connection properties (
createBaseProperties): Builds aPropertiesobject with TCP keep-alive, connection timeouts, and socket timeouts appropriate for the target database (PostgreSQL uses seconds; MySQL uses milliseconds). - Default connection creation (
getConnectionDefault): Creates ajava.sql.Connectionusing password-based authentication, with auto-commit disabled by default and transaction isolation set toREAD_COMMITTED(except for PostgreSQL).
Code Reference
Source Location
java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java
Signature
public abstract class JdbcUtils {
static final int CONNECTION_TIMEOUT = 30;
static final int SOCKET_TIMEOUT = 300;
public static Optional<JdbcDialectFactory> getDialectFactory(String jdbcUrl);
static Properties createBaseProperties(String jdbcUrl, String user);
static Connection getConnectionDefault(JDBCSinkConfig config) throws SQLException;
}
Imports
import com.risingwave.connector.jdbc.JdbcDialectFactory;
import com.risingwave.connector.jdbc.MySqlDialectFactory;
import com.risingwave.connector.jdbc.PostgresDialectFactory;
import com.risingwave.connector.jdbc.RedShiftDialectFactory;
import com.risingwave.connector.jdbc.SnowflakeDialectFactory;
import com.risingwave.connector.jdbc.SqlServerDialectFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
I/O Contract
getDialectFactory
| Direction | Type | Description |
|---|---|---|
| Input | String (jdbcUrl) | JDBC URL used to identify the target database type |
| Output | Optional<JdbcDialectFactory> | The matching dialect factory, or empty if the URL prefix is unrecognized |
URL prefix to factory mapping:
| URL Prefix | Factory |
|---|---|
jdbc:mysql, jdbc:mariadb |
MySqlDialectFactory |
jdbc:postgresql |
PostgresDialectFactory |
jdbc:redshift |
RedShiftDialectFactory |
jdbc:snowflake |
SnowflakeDialectFactory |
jdbc:sqlserver |
SqlServerDialectFactory |
createBaseProperties
| Direction | Type | Description |
|---|---|---|
| Input | String (jdbcUrl), String (user) | JDBC URL for database-specific timeout units; username for authentication |
| Output | Properties | Connection properties with tcpKeepAlive, connectTimeout, socketTimeout, and optional user
|
getConnectionDefault
| Direction | Type | Description |
|---|---|---|
| Input | JDBCSinkConfig | Full sink configuration including URL, credentials, and batch settings |
| Output | Connection | JDBC connection with auto-commit disabled and isolation level set to READ_COMMITTED (non-PostgreSQL)
|
Special behaviors:
- For Redshift URLs with
batchInsertRows > 0, enablesreWriteBatchedInsertsand sets the batch size. - PostgreSQL connections skip the explicit
setTransactionIsolationcall (see issue #24215).
Usage Examples
Resolving a Dialect Factory
Optional<JdbcDialectFactory> factory = JdbcUtils.getDialectFactory("jdbc:postgresql://localhost:5432/mydb");
if (factory.isPresent()) {
JdbcDialect dialect = factory.get().create(columnSqlTypes, pkIndices);
}
Default Connection Creation (via JDBCSinkConfig)
// JDBCSinkConfig.getConnection() delegates to JdbcUtils.getConnectionDefault()
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class);
Connection conn = config.getConnection();
Related Pages
- Risingwavelabs_Risingwave_JDBCSinkConfig -- Configuration class whose
getConnection()delegates to this utility - Risingwavelabs_Risingwave_SnowflakeJDBCSinkConfig -- Overrides connection logic for Snowflake key-pair authentication
- Risingwavelabs_Risingwave_JDBCSinkFactory -- Factory that uses this utility indirectly through config objects
- Risingwavelabs_Risingwave_JdbcDialect_Interface -- Interface returned by dialect factories resolved here
- Risingwavelabs_Risingwave_MySqlDialect -- MySQL dialect resolved by
getDialectFactory - Risingwavelabs_Risingwave_PostgresDialect -- PostgreSQL dialect resolved by
getDialectFactory - Risingwavelabs_Risingwave_RedShiftDialect -- Redshift dialect resolved by
getDialectFactory - Risingwavelabs_Risingwave_SnowflakeDialect -- Snowflake dialect resolved by
getDialectFactory - Risingwavelabs_Risingwave_SqlServerDialect -- SQL Server dialect resolved by
getDialectFactory
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment