Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Tensorflow Serving StreamingBatchScheduler

From Leeroopedia
Knowledge Sources
Domains Performance, Scheduling
Last Updated 2026-02-13 17:00 GMT

Overview

Concrete tool for low-latency batch scheduling with streaming dispatch and automatic retry support, provided by the batching module.

Description

StreamingBatchScheduler<TaskType> is a template class implementing BatchScheduler<TaskType>. It:

  • Create() (L303-336): Initializes thread pool and batch management
  • Schedule() (L337-381): Adds task to current open batch or starts new one; returns UNAVAILABLE if all threads busy
  • Dispatches batches to the callback function in thread pool threads when full or timed out

CreateRetryingStreamingBatchScheduler() (L454-469) wraps the scheduler with BatchSchedulerRetrier for automatic retry on UNAVAILABLE status.

The SingleTaskScheduler helper (streaming_batch_scheduler.cc L30-89) wraps streaming into single-task mode for compatibility.

Usage

Used as an alternative to BasicBatchScheduler when lower latency or streaming behavior is needed. Created by the serving infrastructure based on configuration.

Code Reference

Source Location

  • Repository: tensorflow/serving
  • File: tensorflow_serving/batching/streaming_batch_scheduler.h (L114-469 template)
  • Helper: tensorflow_serving/batching/streaming_batch_scheduler.cc (L30-89)

Signature

template <typename TaskType>
class StreamingBatchScheduler : public BatchScheduler<TaskType> {
 public:
  struct Options {
    size_t max_batch_size = 1000;
    int64_t batch_timeout_micros = 0;
    int num_batch_threads = port::MaxParallelism();
    string thread_pool_name = "batch_threads";
    uint64_t no_tasks_wait_time_micros = 1000;
  };

  static Status Create(
      const Options& options,
      std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback,
      std::unique_ptr<StreamingBatchScheduler<TaskType>>* scheduler
  );

  Status Schedule(std::unique_ptr<TaskType>* task) override;
};

// With retry wrapper
template <typename TaskType>
Status CreateRetryingStreamingBatchScheduler(
    const typename StreamingBatchScheduler<TaskType>::Options& options,
    const typename BatchSchedulerRetrier<TaskType>::Options& retry_options,
    std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback,
    std::unique_ptr<BatchScheduler<TaskType>>* scheduler
);

Import

#include "tensorflow_serving/batching/streaming_batch_scheduler.h"
#include "tensorflow_serving/batching/batch_scheduler_retrier.h"

I/O Contract

Inputs

Name Type Required Description
options Options Yes Scheduler configuration
process_batch_callback function Yes Called when a batch is ready for processing
task unique_ptr<TaskType> Yes Task conforming to BatchTask interface (must implement size())

Outputs

Name Type Description
scheduler unique_ptr Created scheduler instance
Status Status OK or UNAVAILABLE if all threads busy

Usage Examples

C++ Usage

#include "tensorflow_serving/batching/streaming_batch_scheduler.h"

StreamingBatchScheduler<MyTask>::Options options;
options.max_batch_size = 64;
options.batch_timeout_micros = 5000;  // 5ms
options.num_batch_threads = 4;

std::unique_ptr<StreamingBatchScheduler<MyTask>> scheduler;
TF_CHECK_OK(StreamingBatchScheduler<MyTask>::Create(
    options,
    [](std::unique_ptr<Batch<MyTask>> batch) {
        // Process the batch
        ProcessMyBatch(std::move(batch));
    },
    &scheduler));

// Schedule individual tasks
auto task = std::make_unique<MyTask>(...);
Status s = scheduler->Schedule(&task);
if (s.code() == error::UNAVAILABLE) {
    // All threads busy, retry later
}

Related Pages

Implements Principle

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment