Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Druid SampleForConnect Streaming

From Leeroopedia


Knowledge Sources
Domains Streaming_Ingestion, Data_Sampling
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete sampler API client function for establishing connectivity to streaming message brokers and retrieving sample data, specialized for Kafka/Kinesis/Azure Event Hubs sources.

Description

This is the same sampleForConnect function used in batch ingestion, but when called with a streaming spec type ('kafka' or 'kinesis'), it applies streaming-specific behavior:

  • Uses KAFKA_SAMPLE_INPUT_FORMAT or KINESIS_SAMPLE_INPUT_FORMAT wrappers around the raw data format
  • Supports streaming metadata field configuration (STREAMING_INPUT_FORMAT_FIELDS, KAFKA_METADATA_INPUT_FORMAT_FIELDS, KINESIS_METADATA_INPUT_FORMAT_FIELDS)
  • Samples from the live stream (reads recent messages from the topic/stream)

Usage

Call this function after the user has configured Kafka bootstrap servers and topic, or Kinesis endpoint and stream name. The function validates connectivity and returns sample streaming data.

Code Reference

Source Location

  • Repository: Apache Druid
  • File: web-console/src/utils/sampler.ts
  • Lines: L265-L349 (same function as batch, with streaming-specific branching)
  • Related: web-console/src/views/load-data-view/load-data-view.tsx L894-L1155 (streaming welcome and connect steps)

Signature

export async function sampleForConnect(
  spec: Partial<IngestionSpec>,
  sampleStrategy: SampleStrategy,
): Promise<SampleResponseWithExtraInfo>

// Streaming-specific behavior triggered when:
// getSpecType(spec) === 'kafka' || getSpecType(spec) === 'kinesis'

Import

import { sampleForConnect } from '../utils/sampler';

I/O Contract

Inputs

Name Type Required Description
spec Partial<IngestionSpec> Yes Streaming ingestion spec with ioConfig containing Kafka/Kinesis connection params
sampleStrategy SampleStrategy Yes Sampling approach

Outputs

Name Type Description
data SampleEntry[] Sampled streaming messages as raw data rows
cacheKey string or undefined Cache key for subsequent sampler calls

Usage Examples

Kafka Connection

import { sampleForConnect } from '../utils/sampler';

const kafkaSpec = {
  type: 'kafka',
  spec: {
    ioConfig: {
      type: 'kafka',
      consumerProperties: {
        'bootstrap.servers': 'kafka-broker:9092',
      },
      topic: 'events-topic',
    },
  },
};

const result = await sampleForConnect(kafkaSpec, 'start');
// result.data contains recent messages from the Kafka topic

Kinesis Connection

const kinesisSpec = {
  type: 'kinesis',
  spec: {
    ioConfig: {
      type: 'kinesis',
      stream: 'my-kinesis-stream',
      endpoint: 'kinesis.us-east-1.amazonaws.com',
      awsAssumedRoleArn: 'arn:aws:iam::123456789012:role/druid-kinesis',
    },
  },
};

const result = await sampleForConnect(kinesisSpec, 'start');

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment