Implementation:Risingwavelabs Risingwave CassandraFactory
Appearance
| Property | Value |
|---|---|
| File | java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
|
| Language | Java |
| Lines | 99 |
| Category | Factory |
| Package | com.risingwave.connector
|
Overview
CassandraFactory is the factory class that creates and validates CassandraSink instances. It implements the SinkFactory interface and serves as the entry point for the RisingWave connector framework to instantiate Cassandra sink writers. The factory performs comprehensive validation including URL format checking, connection establishment, schema compatibility verification, and primary key validation for upsert sinks.
Code Reference
Source Location
Signature
public class CassandraFactory implements SinkFactory {
public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties);
public void validate(TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType);
}
Imports
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkWriterV1;
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.net.InetSocketAddress;
import java.util.Map;
I/O Contract
Inputs
- createWriter: Takes a
TableSchemadescribing the RisingWave table schema and aMap<String, String>of table properties containing Cassandra connection configuration. - validate: Takes the same parameters plus a
SinkTypeenum value (SINK_TYPE_UPSERT,SINK_TYPE_APPEND_ONLY, orSINK_TYPE_FORCE_APPEND_ONLY).
Outputs
- createWriter: Returns a
SinkWriterwrapping a newCassandraSinkinstance viaSinkWriterV1.Adapter. - validate: Returns void on success; throws
RuntimeException(via gRPCStatus) on validation failure.
Validation Steps
- URL format check: Verifies the Cassandra URL is in
host:portformat. - Connection check: Establishes a CQL session to the Cassandra cluster with keyspace and datacenter settings, including optional authentication credentials.
- Schema check: Calls
CassandraUtil.checkSchema()to verify column name and type compatibility between RisingWave and Cassandra. - Primary key check: For
SINK_TYPE_UPSERT, callsCassandraUtil.checkPrimaryKey()to ensure primary keys match. - Session cleanup: Closes the CQL session after validation.
Usage Examples
// Creating a Cassandra sink writer through the factory
CassandraFactory factory = new CassandraFactory();
SinkWriter writer = factory.createWriter(tableSchema, tableProperties);
// Validating configuration before use
factory.validate(tableSchema, tableProperties, SinkType.SINK_TYPE_UPSERT);
Related Pages
- CassandraConfig - Configuration class deserialized by this factory
- CassandraUtil - Utility methods for schema and primary key validation
- SinkFactory CreateWriter - General sink factory pattern
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment