Implementation:ArroyoSystems Arroyo Confluent Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
ConfluentConnector is a specialized Kafka connector that wraps KafkaConnector with SASL/SSL authentication and optional Confluent Schema Registry integration for connecting to Confluent Cloud.
Description
The Confluent connector delegates all core logic to the underlying KafkaConnector by converting ConfluentProfile into KafkaConfig with SASL_SSL protocol and PLAIN mechanism authentication using an API key and secret. It supports an optional ConfluentSchemaRegistry with endpoint, API key, and API secret that maps to the Kafka connector's SchemaRegistry::ConfluentSchemaRegistry variant. All operator creation (make_operator), autocomplete (get_autocomplete), metadata definitions (metadata_defs), and profile testing (test_profile) are proxied to KafkaConnector. A Confluent-specific client.id is injected into client configurations for test and table operations. The connector uses from_options to parse connection settings from SQL WITH clauses or stored connection profiles.
Usage
Use ConfluentConnector when connecting Arroyo pipelines to Kafka clusters hosted on Confluent Cloud, especially when using Confluent Schema Registry for Avro, Protobuf, or JSON Schema format resolution.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/confluent/mod.rs
Signature
pub struct ConfluentConnector {}
impl ConfluentConnector {
pub fn connection_from_options(
opts: &mut ConnectorOptions,
) -> anyhow::Result<ConfluentProfile>;
}
impl From<ConfluentProfile> for KafkaConfig { ... }
impl From<Option<ConfluentSchemaRegistry>> for kafka::SchemaRegistry { ... }
impl Connector for ConfluentConnector {
type ProfileT = ConfluentProfile;
type TableT = KafkaTable;
fn name(&self) -> &'static str; // returns "confluent"
fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
fn from_config(&self, id: Option<i64>, name: &str, config: ConfluentProfile,
table: KafkaTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, profile: ConfluentProfile, table: KafkaTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_connectors::confluent::ConfluentConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| bootstrap_servers | BootstrapServers | Yes | Confluent Cloud bootstrap server addresses |
| key | VarStr | Yes | Confluent Cloud API key for SASL authentication |
| secret | VarStr | Yes | Confluent Cloud API secret for SASL authentication |
| schema_registry | Option<ConfluentSchemaRegistry> | No | Schema Registry endpoint with API key/secret |
| topic | String | Yes | Kafka topic name (via KafkaTable) |
Outputs
| Name | Type | Description |
|---|---|---|
| Connection | Connection | Kafka connection configured for Confluent Cloud with SASL_SSL |
| ConstructedOperator | ConstructedOperator | Delegates to KafkaConnector's source or sink operator |
Usage Examples
CREATE TABLE confluent_source (
value TEXT
) WITH (
connector = 'confluent',
bootstrap_servers = 'pkc-xxxxx.us-east-1.aws.confluent.cloud:9092',
key = '{{ CONFLUENT_KEY }}',
secret = '{{ CONFLUENT_SECRET }}',
topic = 'my-topic',
type = 'source',
format = 'json'
);