Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Triton inference server Server L0 Sequence Stress

From Leeroopedia


L0 Sequence Stress

Source File: qa/L0_sequence_stress/sequence_stress.py
Language: Python (650 lines)
Domains: Testing, Sequence_Batching

Purpose

This Python module implements a targeted stress test for Triton Inference Server's sequence batcher. It spawns multiple concurrent threads that send various patterns of sequence inference requests (valid, no-start, no-end, valid-valid, valid-no-end) using gRPC streaming to exercise the sequence scheduler's slot management, timeout handling, and error detection under load.

Signature

# Constants:
CORRELATION_ID_BLOCK_SIZE = 100
DEFAULT_TIMEOUT_MS = 5000
SEQUENCE_LENGTH_MEAN = 16
SEQUENCE_LENGTH_STDEV = 8

# Key classes:
class UserData:
    """Container for async streaming request results."""

class TimeoutException(Exception): ...

# Key functions:
def completion_callback(user_data, result, error)
def check_sequence_async(client_metadata, trial, model_name, input_dtype,
                         steps, timeout_ms, sequence_name) -> None
def get_datatype(trial) -> np.dtype
def sequence_valid(client_metadata, rng, trial, model_name, dtype,
                   len_mean, len_stddev, sequence_name)
def sequence_valid_valid(client_metadata, rng, trial, model_name, dtype,
                         len_mean, len_stddev, sequence_name)
def sequence_valid_no_end(client_metadata, rng, trial, model_name, dtype,
                          len_mean, len_stddev, sequence_name)
def sequence_no_start(client_metadata, rng, trial, model_name, dtype, sequence_name)
def sequence_no_end(client_metadata, rng, trial, model_name, dtype,
                    len_mean, len_stddev, sequence_name)
def stress_thread(name, seed, pass_cnt, correlation_id_base,
                  trial, model_name, dtype)
def check_status(model_name)

# Command-line interface:
#   -v, --verbose       Enable verbose output
#   -r, --random-seed   Random seed for reproducibility
#   -t, --concurrency   Number of threads (default: 8)
#   -i, --iterations    Iterations per thread (default: 200)

Key Components

Core Async Sequence Execution

The check_sequence_async function manages a single sequence of inference requests using gRPC streaming. It handles sequence start/end flags, input data preparation, timeout detection, and result validation.

def check_sequence_async(client_metadata, trial, model_name, input_dtype,
                         steps, timeout_ms=DEFAULT_TIMEOUT_MS, sequence_name="<unknown>"):
    triton_client = client_metadata[0]
    sequence_id = client_metadata[1]

    triton_client.stop_stream()
    triton_client.start_stream(partial(completion_callback, user_data))

    for flag_str, value, expected_result, delay_ms in steps:
        triton_client.async_stream_infer(
            model_name, inputs,
            sequence_id=sequence_id,
            sequence_start=seq_start,
            sequence_end=seq_end,
        )

    # Process results in order, check timeout and expected values
    while processed_count < sent_count:
        (results, error) = user_data._completed_requests.get()
        if timeout_ms != None:
            now_ms = int(round(time.time() * 1000))
            if (now_ms - seq_start_ms) > timeout_ms:
                raise TimeoutException(...)
        result = results.as_numpy("OUTPUT")[0][0]
        assert result == expected

Sequence Patterns

Function Pattern Purpose
sequence_valid Single sequence with start and end flags Normal sequence lifecycle
sequence_valid_valid Two complete sequences back-to-back on same correlation ID Slot reuse verification
sequence_valid_no_end First complete, second without end flag Timeout slot reclamation
sequence_no_start Single request without start flag Error handling (expects "must specify the START flag")
sequence_no_end Sequence with start but no end Server-initiated slot abort

Each sequence uses variable length drawn from a normal distribution (mean=16, stdev=8) and random integer values. Expected results are computed as cumulative sums.

def sequence_valid(client_metadata, rng, trial, model_name, dtype, len_mean, len_stddev, ...):
    seqlen = max(1, int(rng.normal(len_mean, len_stddev)))
    values = rng.randint(0, 1024 * 1024, size=seqlen, dtype=dtype)
    steps = []
    expected_result = 0
    for idx in range(seqlen):
        flags = ""
        if idx == 0: flags += ",start"
        if idx == (seqlen - 1): flags += ",end"
        expected_result += values[idx]
        steps.append((flags, values[idx], expected_result, None))
    check_sequence_async(client_metadata, trial, model_name, dtype, steps, ...)

Stress Thread

Each thread creates 10 gRPC client contexts (2 common + 8 rare) with unique correlation ID blocks. Over the configured number of iterations, each pass randomly selects between:

  • Rare context (10% probability): Sends no-end, valid-no-end, or valid-valid sequences to exercise idle sequence slot paths
  • Common context (90% probability): Weighted selection among all patterns with the following probabilities:
    • no-start: 1% (only if previous was not no-end)
    • no-end: 4%
    • valid-no-end: 5%
    • valid-valid: 5%
    • valid: 85%
def stress_thread(name, seed, pass_cnt, correlation_id_base, trial, model_name, dtype):
    for p in range(pass_cnt):
        if rng.rand() < 0.1:
            # Rare context - exercises idle sequence paths
            client_idx = common_cnt + rare_idx
            if choice < 0.33:
                sequence_no_end(...)
            elif choice < 0.66:
                sequence_valid_no_end(...)
            else:
                sequence_valid_valid(...)
        else:
            # Common context - weighted selection
            if (last_choice != "no-end") and (choice < 0.01):
                sequence_no_start(...)
            elif choice < 0.05:
                sequence_no_end(...)
            elif choice < 0.10:
                sequence_valid_no_end(...)
            elif choice < 0.15:
                sequence_valid_valid(...)
            else:
                sequence_valid(...)

Constraint Tracking

The last_choices array tracks the most recent sequence pattern per context to prevent invalid pattern combinations (e.g., no-start following no-end, which the server would interpret as a continuation rather than an error).

Status Checking

After all threads complete, check_status retrieves and prints inference statistics for the test model to aid in post-test analysis.

def check_status(model_name):
    client = grpcclient.InferenceServerClient("localhost:8001", verbose=FLAGS.verbose)
    stats = client.get_inference_statistics(model_name)
    print(stats)

Test Flow

  1. Parse command-line arguments (seed, concurrency, iterations)
  2. Determine trial type and model name (default: "custom" sequence model)
  3. Launch concurrent stress threads with unique seeds and correlation ID blocks
  4. Each thread executes weighted random sequence patterns for N iterations
  5. Collect thread exceptions in a thread-safe list
  6. Join all threads, check inference statistics, and report errors
  7. Exit with code 0 on success, 1 on any non-allowed failure

Dependencies

  • tritonclient.grpc - gRPC Triton client with streaming support
  • numpy - Random number generation and data types
  • test_util (tu) - get_sequence_model_name() utility
  • tritonclient.utils - np_to_triton_dtype conversion

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment