Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave SnowflakeDialect

From Leeroopedia


Property Value
Component risingwave-sink-jdbc
Language Java
Package com.risingwave.connector.jdbc
Implements JdbcDialect
Lines 117
Source SnowflakeDialect.java

Overview

SnowflakeDialect implements the JdbcDialect interface to provide Snowflake-compatible SQL generation for the RisingWave JDBC sink connector. This dialect is designed exclusively for append-only operation -- upsert and delete operations throw UnsupportedOperationException.

Key characteristics of this dialect:

  • Identifier quoting: Uses double-quotes ("identifier").
  • Table name normalization: Returns the unqualified table name only (no schema prefix), matching Snowflake's convention where the schema is specified in the JDBC connection URL.
  • Upsert and delete: Explicitly unsupported; Snowflake sinks use BatchAppendOnlyJDBCSink.
  • Type binding: Provides detailed type-specific handling for temporal types (DATE, TIME, TIMESTAMP, TIMESTAMPTZ), converting RisingWave Java temporal objects to their JDBC equivalents. TIMESTAMPTZ values are serialized as ISO-8601 strings.

Code Reference

Source Location

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/SnowflakeDialect.java

Signature

public class SnowflakeDialect implements JdbcDialect {
    @Override public SchemaTableName createSchemaTableName(String schemaName, String tableName);
    @Override public String getNormalizedTableName(SchemaTableName schemaTableName);
    @Override public String quoteIdentifier(String identifier);
    @Override public Optional<String> getUpsertStatement(
            SchemaTableName schemaTableName, TableSchema tableSchema, List<String> uniqueKeyFields);
    @Override public void bindUpsertStatement(
            PreparedStatement stmt, Connection conn, TableSchema tableSchema, SinkRow row) throws SQLException;
    @Override public void bindInsertIntoStatement(
            PreparedStatement stmt, Connection conn, TableSchema tableSchema, SinkRow row) throws SQLException;
    @Override public void bindDeleteStatement(
            PreparedStatement stmt, TableSchema tableSchema, SinkRow row) throws SQLException;
}

Imports

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;

I/O Contract

Unsupported Operations

Method Behavior
getUpsertStatement Throws UnsupportedOperationException
bindUpsertStatement Throws UnsupportedOperationException
bindDeleteStatement Throws UnsupportedOperationException

Type Binding Behavior (bindInsertIntoStatement)

RisingWave Type Binding Method Notes
DECIMAL setBigDecimal Direct BigDecimal binding
JSONB setObject Generic object binding
BYTEA setBytes Raw byte array
VARCHAR setString Direct string binding
DATE setDate Converts java.time.LocalDate to java.sql.Date; null-safe
TIME setTime Converts java.time.LocalTime to java.sql.Time; null-safe
TIMESTAMP setTimestamp Converts java.time.LocalDateTime to java.sql.Timestamp; null-safe
TIMESTAMPTZ setString Converts java.time.OffsetDateTime to ISO-8601 string representation
All others setObject Generic fallback

Usage Examples

// Create Snowflake dialect (no constructor parameters needed)
SnowflakeDialect dialect = new SnowflakeDialect();

// Generate INSERT statement
SchemaTableName stn = dialect.createSchemaTableName("PUBLIC", "events");
String insertSql = dialect.getInsertIntoStatement(stn, fieldNames);
// Produces: INSERT INTO events("col1", "col2") VALUES (?, ?)
// Note: getNormalizedTableName returns unqualified table name

// Bind and execute
PreparedStatement stmt = conn.prepareStatement(insertSql);
dialect.bindInsertIntoStatement(stmt, conn, tableSchema, row);
stmt.executeUpdate();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment