Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Confluent Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

ConfluentConnector is a specialized Kafka connector that wraps KafkaConnector with SASL/SSL authentication and optional Confluent Schema Registry integration for connecting to Confluent Cloud.

Description

The Confluent connector delegates all core logic to the underlying KafkaConnector by converting ConfluentProfile into KafkaConfig with SASL_SSL protocol and PLAIN mechanism authentication using an API key and secret. It supports an optional ConfluentSchemaRegistry with endpoint, API key, and API secret that maps to the Kafka connector's SchemaRegistry::ConfluentSchemaRegistry variant. All operator creation (make_operator), autocomplete (get_autocomplete), metadata definitions (metadata_defs), and profile testing (test_profile) are proxied to KafkaConnector. A Confluent-specific client.id is injected into client configurations for test and table operations. The connector uses from_options to parse connection settings from SQL WITH clauses or stored connection profiles.

Usage

Use ConfluentConnector when connecting Arroyo pipelines to Kafka clusters hosted on Confluent Cloud, especially when using Confluent Schema Registry for Avro, Protobuf, or JSON Schema format resolution.

Code Reference

Source Location

Signature

pub struct ConfluentConnector {}

impl ConfluentConnector {
    pub fn connection_from_options(
        opts: &mut ConnectorOptions,
    ) -> anyhow::Result<ConfluentProfile>;
}

impl From<ConfluentProfile> for KafkaConfig { ... }
impl From<Option<ConfluentSchemaRegistry>> for kafka::SchemaRegistry { ... }

impl Connector for ConfluentConnector {
    type ProfileT = ConfluentProfile;
    type TableT = KafkaTable;

    fn name(&self) -> &'static str; // returns "confluent"
    fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
    fn from_config(&self, id: Option<i64>, name: &str, config: ConfluentProfile,
        table: KafkaTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, profile: ConfluentProfile, table: KafkaTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_connectors::confluent::ConfluentConnector;

I/O Contract

Inputs

Name Type Required Description
bootstrap_servers BootstrapServers Yes Confluent Cloud bootstrap server addresses
key VarStr Yes Confluent Cloud API key for SASL authentication
secret VarStr Yes Confluent Cloud API secret for SASL authentication
schema_registry Option<ConfluentSchemaRegistry> No Schema Registry endpoint with API key/secret
topic String Yes Kafka topic name (via KafkaTable)

Outputs

Name Type Description
Connection Connection Kafka connection configured for Confluent Cloud with SASL_SSL
ConstructedOperator ConstructedOperator Delegates to KafkaConnector's source or sink operator

Usage Examples

CREATE TABLE confluent_source (
    value TEXT
) WITH (
    connector = 'confluent',
    bootstrap_servers = 'pkc-xxxxx.us-east-1.aws.confluent.cloud:9092',
    key = '{{ CONFLUENT_KEY }}',
    secret = '{{ CONFLUENT_SECRET }}',
    topic = 'my-topic',
    type = 'source',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment