Implementation:Risingwavelabs Risingwave SinkFactory CreateWriter
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Software_Architecture, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for creating and validating sink connector instances provided by the RisingWave Java connector API.
Description
SinkFactory is the Java interface that all sink connector implementations must implement. It defines three methods: createWriter (instantiate a writer for a specific table schema and properties), validate (check configuration before creating the sink), and createCoordinator (for distributed commit coordination). Implementations include JDBCSinkFactory, EsSinkFactory, CassandraFactory, and others.
The SinkWriter interface defines the data writing lifecycle: beginEpoch, write, barrier, and drop. The SinkCoordinator interface handles distributed commit and drop operations.
Usage
These interfaces are implemented by each sink connector type. They are discovered via Java ServiceLoader and instantiated by SinkUtils.getSinkFactory() when processing CREATE SINK statements.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkFactory.java (L23-32)
- File: java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java (L22-50)
- File: java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkCoordinator.java (L22-26)
Signature
public interface SinkFactory {
SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties);
void validate(
TableSchema tableSchema,
Map<String, String> tableProperties,
SinkType sinkType
);
SinkCoordinator createCoordinator(Map<String, String> tableProperties);
}
public interface SinkWriter {
void beginEpoch(long epoch);
boolean write(Iterable<SinkRow> rows);
Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint);
void drop();
}
public interface SinkCoordinator {
void commit(long epoch, List<ConnectorServiceProto.SinkMetadata> metadataList);
void drop();
}
Import
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkCoordinator;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tableSchema | TableSchema | Yes | Column names and types from the source relation |
| tableProperties | Map<String, String> | Yes | WITH clause properties (connector type, connection URL, etc.) |
| sinkType | SinkType | Yes (for validate) | APPEND_ONLY, FORCE_APPEND_ONLY, or UPSERT |
Outputs
| Name | Type | Description |
|---|---|---|
| SinkWriter | instance | Writer configured for the target system |
| SinkCoordinator | instance | Coordinator for distributed commit |
| void (validate) | exception on failure | Throws if configuration is invalid |
Usage Examples
Implementing a Custom Sink
public class MyCustomSinkFactory implements SinkFactory {
@Override
public SinkWriter createWriter(TableSchema schema, Map<String, String> props) {
String url = props.get("my.custom.url");
return new MyCustomSinkWriter(schema, url);
}
@Override
public void validate(TableSchema schema, Map<String, String> props, SinkType type) {
if (!props.containsKey("my.custom.url")) {
throw new IllegalArgumentException("my.custom.url is required");
}
}
@Override
public SinkCoordinator createCoordinator(Map<String, String> props) {
return null; // No coordination needed
}
}
SQL to Create Sink
CREATE SINK es_sink FROM user_events
WITH (
connector = 'elasticsearch',
url = 'http://elasticsearch:9200',
index = 'user_events',
type = 'append-only'
);