Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave JdbcUtils

From Leeroopedia
Revision as of 16:31, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_JdbcUtils.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Property Value
Component risingwave-sink-jdbc
Language Java
Package com.risingwave.connector
Type Abstract utility class
Lines 118
Source JdbcUtils.java

Overview

JdbcUtils is an abstract utility class that serves as the central hub for JDBC connection management and dialect resolution in the RisingWave JDBC sink connector. It provides three key capabilities:

  1. Dialect factory resolution (getDialectFactory): Maps JDBC URL prefixes to the appropriate JdbcDialectFactory implementation, supporting MySQL/MariaDB, PostgreSQL, Redshift, Snowflake, and SQL Server.
  2. Base connection properties (createBaseProperties): Builds a Properties object with TCP keep-alive, connection timeouts, and socket timeouts appropriate for the target database (PostgreSQL uses seconds; MySQL uses milliseconds).
  3. Default connection creation (getConnectionDefault): Creates a java.sql.Connection using password-based authentication, with auto-commit disabled by default and transaction isolation set to READ_COMMITTED (except for PostgreSQL).

Code Reference

Source Location

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java

Signature

public abstract class JdbcUtils {
    static final int CONNECTION_TIMEOUT = 30;
    static final int SOCKET_TIMEOUT = 300;

    public static Optional<JdbcDialectFactory> getDialectFactory(String jdbcUrl);

    static Properties createBaseProperties(String jdbcUrl, String user);

    static Connection getConnectionDefault(JDBCSinkConfig config) throws SQLException;
}

Imports

import com.risingwave.connector.jdbc.JdbcDialectFactory;
import com.risingwave.connector.jdbc.MySqlDialectFactory;
import com.risingwave.connector.jdbc.PostgresDialectFactory;
import com.risingwave.connector.jdbc.RedShiftDialectFactory;
import com.risingwave.connector.jdbc.SnowflakeDialectFactory;
import com.risingwave.connector.jdbc.SqlServerDialectFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

I/O Contract

getDialectFactory

Direction Type Description
Input String (jdbcUrl) JDBC URL used to identify the target database type
Output Optional<JdbcDialectFactory> The matching dialect factory, or empty if the URL prefix is unrecognized

URL prefix to factory mapping:

URL Prefix Factory
jdbc:mysql, jdbc:mariadb MySqlDialectFactory
jdbc:postgresql PostgresDialectFactory
jdbc:redshift RedShiftDialectFactory
jdbc:snowflake SnowflakeDialectFactory
jdbc:sqlserver SqlServerDialectFactory

createBaseProperties

Direction Type Description
Input String (jdbcUrl), String (user) JDBC URL for database-specific timeout units; username for authentication
Output Properties Connection properties with tcpKeepAlive, connectTimeout, socketTimeout, and optional user

getConnectionDefault

Direction Type Description
Input JDBCSinkConfig Full sink configuration including URL, credentials, and batch settings
Output Connection JDBC connection with auto-commit disabled and isolation level set to READ_COMMITTED (non-PostgreSQL)

Special behaviors:

  • For Redshift URLs with batchInsertRows > 0, enables reWriteBatchedInserts and sets the batch size.
  • PostgreSQL connections skip the explicit setTransactionIsolation call (see issue #24215).

Usage Examples

Resolving a Dialect Factory

Optional<JdbcDialectFactory> factory = JdbcUtils.getDialectFactory("jdbc:postgresql://localhost:5432/mydb");
if (factory.isPresent()) {
    JdbcDialect dialect = factory.get().create(columnSqlTypes, pkIndices);
}

Default Connection Creation (via JDBCSinkConfig)

// JDBCSinkConfig.getConnection() delegates to JdbcUtils.getConnectionDefault()
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class);
Connection conn = config.getConnection();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment