Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave Handle Create Source

From Leeroopedia


Knowledge Sources
Domains Streaming, Data_Ingestion, SQL_DDL
Last Updated 2026-02-09 07:00 GMT

Overview

Concrete tool for registering streaming data sources in RisingWave provided by the frontend SQL handler.

Description

The handle_create_source function is the SQL DDL handler for the CREATE SOURCE statement in RisingWave. It processes the parsed SQL statement, validates connector properties, resolves the source schema from the external system, and registers the source in the catalog. For shared sources, it also generates and deploys a streaming fragment graph.

Usage

This handler is invoked when a user executes a CREATE SOURCE SQL statement through the PostgreSQL wire protocol. It supports all connector types (Kafka, Pulsar, Kinesis, etc.) and data formats (JSON, Avro, Protobuf).

Code Reference

Source Location

  • Repository: risingwave
  • File: src/frontend/src/handler/create_source.rs
  • Lines: L1096-1198

Signature

pub async fn handle_create_source(
    mut handler_args: HandlerArgs,
    stmt: CreateSourceStatement,
) -> Result<RwPgResponse>

Import

// Internal to the frontend crate
use crate::handler::create_source::handle_create_source;

I/O Contract

Inputs

Name Type Required Description
handler_args HandlerArgs Yes Session context, WITH options, and handler metadata
stmt CreateSourceStatement Yes Parsed CREATE SOURCE SQL statement containing source_name, columns, constraints, format_encode, with_options, watermarks, include_column_options

Outputs

Name Type Description
Result<RwPgResponse> RwPgResponse Success response (StatementType::CREATE_SOURCE) or error
Catalog entry Side effect Source registered in system catalog
Streaming job Side effect (shared sources) Fragment graph deployed to compute nodes

Usage Examples

Create Kafka Source

CREATE SOURCE kafka_events (
    user_id INT,
    event_type VARCHAR,
    event_timestamp TIMESTAMPTZ,
    payload JSONB
) WITH (
    connector = 'kafka',
    topic = 'user_events',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Create Pulsar Source with Avro

CREATE SOURCE pulsar_orders WITH (
    connector = 'pulsar',
    topic = 'persistent://public/default/orders',
    service.url = 'pulsar://pulsar:6650',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (
    schema.registry = 'http://schema-registry:8081'
);

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment