Implementation:Apache Druid SampleForConnect Streaming
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Streaming_Ingestion, Data_Sampling |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete sampler API client function for establishing connectivity to streaming message brokers and retrieving sample data, specialized for Kafka/Kinesis/Azure Event Hubs sources.
Description
This is the same sampleForConnect function used in batch ingestion, but when called with a streaming spec type ('kafka' or 'kinesis'), it applies streaming-specific behavior:
- Uses KAFKA_SAMPLE_INPUT_FORMAT or KINESIS_SAMPLE_INPUT_FORMAT wrappers around the raw data format
- Supports streaming metadata field configuration (STREAMING_INPUT_FORMAT_FIELDS, KAFKA_METADATA_INPUT_FORMAT_FIELDS, KINESIS_METADATA_INPUT_FORMAT_FIELDS)
- Samples from the live stream (reads recent messages from the topic/stream)
Usage
Call this function after the user has configured Kafka bootstrap servers and topic, or Kinesis endpoint and stream name. The function validates connectivity and returns sample streaming data.
Code Reference
Source Location
- Repository: Apache Druid
- File: web-console/src/utils/sampler.ts
- Lines: L265-L349 (same function as batch, with streaming-specific branching)
- Related: web-console/src/views/load-data-view/load-data-view.tsx L894-L1155 (streaming welcome and connect steps)
Signature
export async function sampleForConnect(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
): Promise<SampleResponseWithExtraInfo>
// Streaming-specific behavior triggered when:
// getSpecType(spec) === 'kafka' || getSpecType(spec) === 'kinesis'
Import
import { sampleForConnect } from '../utils/sampler';
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| spec | Partial<IngestionSpec> | Yes | Streaming ingestion spec with ioConfig containing Kafka/Kinesis connection params |
| sampleStrategy | SampleStrategy | Yes | Sampling approach |
Outputs
| Name | Type | Description |
|---|---|---|
| data | SampleEntry[] | Sampled streaming messages as raw data rows |
| cacheKey | string or undefined | Cache key for subsequent sampler calls |
Usage Examples
Kafka Connection
import { sampleForConnect } from '../utils/sampler';
const kafkaSpec = {
type: 'kafka',
spec: {
ioConfig: {
type: 'kafka',
consumerProperties: {
'bootstrap.servers': 'kafka-broker:9092',
},
topic: 'events-topic',
},
},
};
const result = await sampleForConnect(kafkaSpec, 'start');
// result.data contains recent messages from the Kafka topic
Kinesis Connection
const kinesisSpec = {
type: 'kinesis',
spec: {
ioConfig: {
type: 'kinesis',
stream: 'my-kinesis-stream',
endpoint: 'kinesis.us-east-1.amazonaws.com',
awsAssumedRoleArn: 'arn:aws:iam::123456789012:role/druid-kinesis',
},
},
};
const result = await sampleForConnect(kinesisSpec, 'start');
Related Pages
Implements Principle
Requires Environment
Uses Heuristic
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment