Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave CassandraFactory

From Leeroopedia


Property Value
File java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
Language Java
Lines 99
Category Factory
Package com.risingwave.connector

Overview

CassandraFactory is the factory class that creates and validates CassandraSink instances. It implements the SinkFactory interface and serves as the entry point for the RisingWave connector framework to instantiate Cassandra sink writers. The factory performs comprehensive validation including URL format checking, connection establishment, schema compatibility verification, and primary key validation for upsert sinks.

Code Reference

Source Location

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java

Signature

public class CassandraFactory implements SinkFactory {
    public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties);
    public void validate(TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType);
}

Imports

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkWriterV1;
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.net.InetSocketAddress;
import java.util.Map;

I/O Contract

Inputs

  • createWriter: Takes a TableSchema describing the RisingWave table schema and a Map<String, String> of table properties containing Cassandra connection configuration.
  • validate: Takes the same parameters plus a SinkType enum value (SINK_TYPE_UPSERT, SINK_TYPE_APPEND_ONLY, or SINK_TYPE_FORCE_APPEND_ONLY).

Outputs

  • createWriter: Returns a SinkWriter wrapping a new CassandraSink instance via SinkWriterV1.Adapter.
  • validate: Returns void on success; throws RuntimeException (via gRPC Status) on validation failure.

Validation Steps

  1. URL format check: Verifies the Cassandra URL is in host:port format.
  2. Connection check: Establishes a CQL session to the Cassandra cluster with keyspace and datacenter settings, including optional authentication credentials.
  3. Schema check: Calls CassandraUtil.checkSchema() to verify column name and type compatibility between RisingWave and Cassandra.
  4. Primary key check: For SINK_TYPE_UPSERT, calls CassandraUtil.checkPrimaryKey() to ensure primary keys match.
  5. Session cleanup: Closes the CQL session after validation.

Usage Examples

// Creating a Cassandra sink writer through the factory
CassandraFactory factory = new CassandraFactory();
SinkWriter writer = factory.createWriter(tableSchema, tableProperties);

// Validating configuration before use
factory.validate(tableSchema, tableProperties, SinkType.SINK_TYPE_UPSERT);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment