Implementation:Triton inference server Server L0 Sequence Stress
L0 Sequence Stress
Source File: qa/L0_sequence_stress/sequence_stress.py
Language: Python (650 lines)
Domains: Testing, Sequence_Batching
Purpose
This Python module implements a targeted stress test for Triton Inference Server's sequence batcher. It spawns multiple concurrent threads that send various patterns of sequence inference requests (valid, no-start, no-end, valid-valid, valid-no-end) using gRPC streaming to exercise the sequence scheduler's slot management, timeout handling, and error detection under load.
Signature
# Constants:
CORRELATION_ID_BLOCK_SIZE = 100
DEFAULT_TIMEOUT_MS = 5000
SEQUENCE_LENGTH_MEAN = 16
SEQUENCE_LENGTH_STDEV = 8
# Key classes:
class UserData:
"""Container for async streaming request results."""
class TimeoutException(Exception): ...
# Key functions:
def completion_callback(user_data, result, error)
def check_sequence_async(client_metadata, trial, model_name, input_dtype,
steps, timeout_ms, sequence_name) -> None
def get_datatype(trial) -> np.dtype
def sequence_valid(client_metadata, rng, trial, model_name, dtype,
len_mean, len_stddev, sequence_name)
def sequence_valid_valid(client_metadata, rng, trial, model_name, dtype,
len_mean, len_stddev, sequence_name)
def sequence_valid_no_end(client_metadata, rng, trial, model_name, dtype,
len_mean, len_stddev, sequence_name)
def sequence_no_start(client_metadata, rng, trial, model_name, dtype, sequence_name)
def sequence_no_end(client_metadata, rng, trial, model_name, dtype,
len_mean, len_stddev, sequence_name)
def stress_thread(name, seed, pass_cnt, correlation_id_base,
trial, model_name, dtype)
def check_status(model_name)
# Command-line interface:
# -v, --verbose Enable verbose output
# -r, --random-seed Random seed for reproducibility
# -t, --concurrency Number of threads (default: 8)
# -i, --iterations Iterations per thread (default: 200)
Key Components
Core Async Sequence Execution
The check_sequence_async function manages a single sequence of inference requests using gRPC streaming. It handles sequence start/end flags, input data preparation, timeout detection, and result validation.
def check_sequence_async(client_metadata, trial, model_name, input_dtype,
steps, timeout_ms=DEFAULT_TIMEOUT_MS, sequence_name="<unknown>"):
triton_client = client_metadata[0]
sequence_id = client_metadata[1]
triton_client.stop_stream()
triton_client.start_stream(partial(completion_callback, user_data))
for flag_str, value, expected_result, delay_ms in steps:
triton_client.async_stream_infer(
model_name, inputs,
sequence_id=sequence_id,
sequence_start=seq_start,
sequence_end=seq_end,
)
# Process results in order, check timeout and expected values
while processed_count < sent_count:
(results, error) = user_data._completed_requests.get()
if timeout_ms != None:
now_ms = int(round(time.time() * 1000))
if (now_ms - seq_start_ms) > timeout_ms:
raise TimeoutException(...)
result = results.as_numpy("OUTPUT")[0][0]
assert result == expected
Sequence Patterns
| Function | Pattern | Purpose |
|---|---|---|
sequence_valid |
Single sequence with start and end flags | Normal sequence lifecycle |
sequence_valid_valid |
Two complete sequences back-to-back on same correlation ID | Slot reuse verification |
sequence_valid_no_end |
First complete, second without end flag | Timeout slot reclamation |
sequence_no_start |
Single request without start flag | Error handling (expects "must specify the START flag") |
sequence_no_end |
Sequence with start but no end | Server-initiated slot abort |
Each sequence uses variable length drawn from a normal distribution (mean=16, stdev=8) and random integer values. Expected results are computed as cumulative sums.
def sequence_valid(client_metadata, rng, trial, model_name, dtype, len_mean, len_stddev, ...):
seqlen = max(1, int(rng.normal(len_mean, len_stddev)))
values = rng.randint(0, 1024 * 1024, size=seqlen, dtype=dtype)
steps = []
expected_result = 0
for idx in range(seqlen):
flags = ""
if idx == 0: flags += ",start"
if idx == (seqlen - 1): flags += ",end"
expected_result += values[idx]
steps.append((flags, values[idx], expected_result, None))
check_sequence_async(client_metadata, trial, model_name, dtype, steps, ...)
Stress Thread
Each thread creates 10 gRPC client contexts (2 common + 8 rare) with unique correlation ID blocks. Over the configured number of iterations, each pass randomly selects between:
- Rare context (10% probability): Sends no-end, valid-no-end, or valid-valid sequences to exercise idle sequence slot paths
- Common context (90% probability): Weighted selection among all patterns with the following probabilities:
- no-start: 1% (only if previous was not no-end)
- no-end: 4%
- valid-no-end: 5%
- valid-valid: 5%
- valid: 85%
def stress_thread(name, seed, pass_cnt, correlation_id_base, trial, model_name, dtype):
for p in range(pass_cnt):
if rng.rand() < 0.1:
# Rare context - exercises idle sequence paths
client_idx = common_cnt + rare_idx
if choice < 0.33:
sequence_no_end(...)
elif choice < 0.66:
sequence_valid_no_end(...)
else:
sequence_valid_valid(...)
else:
# Common context - weighted selection
if (last_choice != "no-end") and (choice < 0.01):
sequence_no_start(...)
elif choice < 0.05:
sequence_no_end(...)
elif choice < 0.10:
sequence_valid_no_end(...)
elif choice < 0.15:
sequence_valid_valid(...)
else:
sequence_valid(...)
Constraint Tracking
The last_choices array tracks the most recent sequence pattern per context to prevent invalid pattern combinations (e.g., no-start following no-end, which the server would interpret as a continuation rather than an error).
Status Checking
After all threads complete, check_status retrieves and prints inference statistics for the test model to aid in post-test analysis.
def check_status(model_name):
client = grpcclient.InferenceServerClient("localhost:8001", verbose=FLAGS.verbose)
stats = client.get_inference_statistics(model_name)
print(stats)
Test Flow
- Parse command-line arguments (seed, concurrency, iterations)
- Determine trial type and model name (default: "custom" sequence model)
- Launch concurrent stress threads with unique seeds and correlation ID blocks
- Each thread executes weighted random sequence patterns for N iterations
- Collect thread exceptions in a thread-safe list
- Join all threads, check inference statistics, and report errors
- Exit with code 0 on success, 1 on any non-allowed failure
Dependencies
tritonclient.grpc- gRPC Triton client with streaming supportnumpy- Random number generation and data typestest_util(tu) -get_sequence_model_name()utilitytritonclient.utils-np_to_triton_dtypeconversion