Implementation:Apache Druid Sampler Streaming Schema
| Knowledge Sources | |
|---|---|
| Domains | Streaming_Ingestion, Schema_Design |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete sampler API pipeline for configuring schema through the same sampleFor* functions used in batch ingestion, applied to cached streaming data.
Description
The streaming schema configuration uses the same sampler functions as batch ingestion (sampleForParser, sampleForTimestamp, sampleForTransform, sampleForFilter, sampleForSchema) from sampler.ts. When processing streaming specs, these functions:
- Accept the streaming-wrapped input format (kafka/kinesis wrapper around the actual data format)
- Handle streaming metadata columns (KAFKA_METADATA_INPUT_FORMAT_FIELDS, KINESIS_METADATA_INPUT_FORMAT_FIELDS)
- Produce a supervisor-compatible ingestion spec with ioConfig.type set to 'kafka' or 'kinesis'
The wizard rendering in LoadDataView (L1514-L2600) handles both batch and streaming paths transparently.
Usage
These functions are called in sequence through the LoadDataView wizard steps 3-7, same as batch. The streaming-specific input format fields are shown in the parser step.
Code Reference
Source Location
- Repository: Apache Druid
- File: web-console/src/utils/sampler.ts
- Lines: L351-L643 (sampleForParser through sampleForSchema)
- Related: web-console/src/views/load-data-view/load-data-view.tsx L1514-L2600
Signature
// Same functions as batch — streaming behavior triggered by spec type
export async function sampleForParser(spec: Partial<IngestionSpec>, sampleStrategy: SampleStrategy): Promise<SampleResponse>
export async function sampleForTimestamp(spec: Partial<IngestionSpec>, cacheRows: CacheRows): Promise<SampleResponse>
export async function sampleForTransform(spec: Partial<IngestionSpec>, cacheRows: CacheRows, forceSegmentSortByTime: boolean): Promise<SampleResponse>
export async function sampleForFilter(spec: Partial<IngestionSpec>, cacheRows: CacheRows): Promise<SampleResponse>
export async function sampleForSchema(spec: Partial<IngestionSpec>, cacheRows: CacheRows): Promise<SampleResponse>
Import
import { sampleForParser, sampleForTimestamp, sampleForTransform, sampleForFilter, sampleForSchema } from '../utils/sampler';
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| spec | Partial<IngestionSpec> | Yes | Streaming ingestion spec with kafka/kinesis ioConfig |
| cacheRows | CacheRows | Yes | Cached streaming sample data from sampleForConnect |
Outputs
| Name | Type | Description |
|---|---|---|
| data | SampleEntry[] | Parsed, timestamped, transformed, filtered, or schema-applied sample data |
Usage Examples
Streaming Schema Pipeline
// Same API as batch, but with streaming spec:
const connectResult = await sampleForConnect(kafkaSpec, 'start');
const parseResult = await sampleForParser(kafkaSpec, 'start');
const timeResult = await sampleForTimestamp(kafkaSpec, connectResult.cacheKey);
const transformResult = await sampleForTransform(kafkaSpec, connectResult.cacheKey, true);
const filterResult = await sampleForFilter(kafkaSpec, connectResult.cacheKey);
const schemaResult = await sampleForSchema(kafkaSpec, connectResult.cacheKey);