Implementation:Tensorflow Serving StreamingBatchScheduler
| Knowledge Sources | |
|---|---|
| Domains | Performance, Scheduling |
| Last Updated | 2026-02-13 17:00 GMT |
Overview
Concrete tool for low-latency batch scheduling with streaming dispatch and automatic retry support, provided by the batching module.
Description
StreamingBatchScheduler<TaskType> is a template class implementing BatchScheduler<TaskType>. It:
- Create() (L303-336): Initializes thread pool and batch management
- Schedule() (L337-381): Adds task to current open batch or starts new one; returns UNAVAILABLE if all threads busy
- Dispatches batches to the callback function in thread pool threads when full or timed out
CreateRetryingStreamingBatchScheduler() (L454-469) wraps the scheduler with BatchSchedulerRetrier for automatic retry on UNAVAILABLE status.
The SingleTaskScheduler helper (streaming_batch_scheduler.cc L30-89) wraps streaming into single-task mode for compatibility.
Usage
Used as an alternative to BasicBatchScheduler when lower latency or streaming behavior is needed. Created by the serving infrastructure based on configuration.
Code Reference
Source Location
- Repository: tensorflow/serving
- File: tensorflow_serving/batching/streaming_batch_scheduler.h (L114-469 template)
- Helper: tensorflow_serving/batching/streaming_batch_scheduler.cc (L30-89)
Signature
template <typename TaskType>
class StreamingBatchScheduler : public BatchScheduler<TaskType> {
public:
struct Options {
size_t max_batch_size = 1000;
int64_t batch_timeout_micros = 0;
int num_batch_threads = port::MaxParallelism();
string thread_pool_name = "batch_threads";
uint64_t no_tasks_wait_time_micros = 1000;
};
static Status Create(
const Options& options,
std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback,
std::unique_ptr<StreamingBatchScheduler<TaskType>>* scheduler
);
Status Schedule(std::unique_ptr<TaskType>* task) override;
};
// With retry wrapper
template <typename TaskType>
Status CreateRetryingStreamingBatchScheduler(
const typename StreamingBatchScheduler<TaskType>::Options& options,
const typename BatchSchedulerRetrier<TaskType>::Options& retry_options,
std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback,
std::unique_ptr<BatchScheduler<TaskType>>* scheduler
);
Import
#include "tensorflow_serving/batching/streaming_batch_scheduler.h"
#include "tensorflow_serving/batching/batch_scheduler_retrier.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| options | Options | Yes | Scheduler configuration |
| process_batch_callback | function | Yes | Called when a batch is ready for processing |
| task | unique_ptr<TaskType> | Yes | Task conforming to BatchTask interface (must implement size()) |
Outputs
| Name | Type | Description |
|---|---|---|
| scheduler | unique_ptr | Created scheduler instance |
| Status | Status | OK or UNAVAILABLE if all threads busy |
Usage Examples
C++ Usage
#include "tensorflow_serving/batching/streaming_batch_scheduler.h"
StreamingBatchScheduler<MyTask>::Options options;
options.max_batch_size = 64;
options.batch_timeout_micros = 5000; // 5ms
options.num_batch_threads = 4;
std::unique_ptr<StreamingBatchScheduler<MyTask>> scheduler;
TF_CHECK_OK(StreamingBatchScheduler<MyTask>::Create(
options,
[](std::unique_ptr<Batch<MyTask>> batch) {
// Process the batch
ProcessMyBatch(std::move(batch));
},
&scheduler));
// Schedule individual tasks
auto task = std::make_unique<MyTask>(...);
Status s = scheduler->Schedule(&task);
if (s.code() == error::UNAVAILABLE) {
// All threads busy, retry later
}