Implementation:Risingwavelabs Risingwave Handle Create Source
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Data_Ingestion, SQL_DDL |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for registering streaming data sources in RisingWave provided by the frontend SQL handler.
Description
The handle_create_source function is the SQL DDL handler for the CREATE SOURCE statement in RisingWave. It processes the parsed SQL statement, validates connector properties, resolves the source schema from the external system, and registers the source in the catalog. For shared sources, it also generates and deploys a streaming fragment graph.
Usage
This handler is invoked when a user executes a CREATE SOURCE SQL statement through the PostgreSQL wire protocol. It supports all connector types (Kafka, Pulsar, Kinesis, etc.) and data formats (JSON, Avro, Protobuf).
Code Reference
Source Location
- Repository: risingwave
- File: src/frontend/src/handler/create_source.rs
- Lines: L1096-1198
Signature
pub async fn handle_create_source(
mut handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse>
Import
// Internal to the frontend crate
use crate::handler::create_source::handle_create_source;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| handler_args | HandlerArgs | Yes | Session context, WITH options, and handler metadata |
| stmt | CreateSourceStatement | Yes | Parsed CREATE SOURCE SQL statement containing source_name, columns, constraints, format_encode, with_options, watermarks, include_column_options |
Outputs
| Name | Type | Description |
|---|---|---|
| Result<RwPgResponse> | RwPgResponse | Success response (StatementType::CREATE_SOURCE) or error |
| Catalog entry | Side effect | Source registered in system catalog |
| Streaming job | Side effect (shared sources) | Fragment graph deployed to compute nodes |
Usage Examples
Create Kafka Source
CREATE SOURCE kafka_events (
user_id INT,
event_type VARCHAR,
event_timestamp TIMESTAMPTZ,
payload JSONB
) WITH (
connector = 'kafka',
topic = 'user_events',
properties.bootstrap.server = 'broker:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Create Pulsar Source with Avro
CREATE SOURCE pulsar_orders WITH (
connector = 'pulsar',
topic = 'persistent://public/default/orders',
service.url = 'pulsar://pulsar:6650',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (
schema.registry = 'http://schema-registry:8081'
);