Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Triton inference server Server L0 Long Running Stress Scenarios

From Leeroopedia


L0 Long Running Stress Scenarios

Source File: qa/L0_long_running_stress/scenarios.py
Language: Python (1013 lines)
Domains: Testing, Stress_Testing

Purpose

This Python module defines the full set of stress test scenarios used by the long-running stress test framework. Each scenario class encapsulates a specific pattern of inference requests designed to exercise different aspects of Triton Inference Server under sustained load, including sequence batching edge cases, timeout behavior, model crashing resilience, image classification, and performance analyzer integration.

Signature

# Constants:
DEFAULT_TIMEOUT_MS = 25000
SEQUENCE_LENGTH_MEAN = 16
SEQUENCE_LENGTH_STDEV = 8

# Exception classes:
class TimeoutException(Exception): ...

# Callback:
def completion_callback(user_data, result, error)

# Base class:
class Scenario(metaclass=abc.ABCMeta):
    def __init__(self, name, trials, verbose=False, out_stream=sys.stdout)
    def scenario_name(self) -> str
    def get_trial(self) -> str
    def get_datatype(self, trial) -> np.dtype
    @abc.abstractmethod
    def run(self, client_metadata): ...

# Scenario implementations:
class PerfAnalyzerScenario(Scenario): ...
class ResNetScenario(Scenario): ...
class TimeoutScenario(Scenario): ...
class CrashingScenario(Scenario): ...
class SequenceScenario(Scenario): ...      # Abstract base for sequence scenarios
class SequenceNoEndScenario(SequenceScenario): ...
class SequenceValidNoEndScenario(SequenceScenario): ...
class SequenceValidValidScenario(SequenceScenario): ...
class SequenceNoStartScenario(SequenceScenario): ...
class SequenceValidScenario(SequenceScenario): ...

Key Components

Scenario Base Class

All scenarios inherit from Scenario, which provides trial selection and datatype mapping. Each scenario implements run(client_metadata) that returns the number of requests sent on success, raises an exception on failure, or returns None if constraints prevent execution.

class Scenario(metaclass=abc.ABCMeta):
    def get_trial(self):
        return np.random.choice(self.trials_)

    def get_datatype(self, trial):
        if "plan" in trial:
            return np.float32
        return np.int32

    @abc.abstractmethod
    def run(self, client_metadata):
        pass

PerfAnalyzerScenario

Wraps the perf_analyzer command-line tool to generate sustained load with configurable concurrency. Contains a nested ModelOption class that manages per-model concurrency ranges and auto-adjusts based on server queue latency feedback.

class PerfAnalyzerScenario(Scenario):
    class ModelOption:
        def __init__(self, model_name, batch_size, concurrency_range,
                     queue_latency_range_us, input_shapes=[], input_file=None):
            self.concurrency_range_ = list(concurrency_range)  # [min, max, current]
            self.queue_latency_range_us_ = queue_latency_range_us

        def run(self, name, sequence_id_range, out_stream):
            # Runs perf_analyzer with gRPC streaming
            # Reads CSV output to adjust concurrency based on queue latency
            # Returns request count from output

Models include resnet_v1_50, sequence models, and identity models. Both raw and validation-data variants are generated for output correctness checking.

ResNetScenario

Sends batched image classification requests using the resnet_v1_50 model. Preprocesses the vulture test image (224x224 RGB with mean subtraction) and verifies the VULTURE classification result.

class ResNetScenario(Scenario):
    def preprocess(self, filename):
        img = Image.open(filename)
        resized_img = img.convert("RGB").resize((224, 224), Image.BILINEAR)
        scaled = np_img - np.asarray((123, 117, 104), dtype=np.float32)
        return scaled

    def postprocess(self, results):
        # Validates VULTURE classification across batch

TimeoutScenario

Sends a 1 GB tensor to trigger gRPC deadline exceeded errors. Expects an InferenceServerException with "Deadline Exceeded" message using a 0.1-second client timeout.

class TimeoutScenario(Scenario):
    def run(self, client_metadata):
        tensor_shape = (math.trunc(1 * (1024 * 1024 * 1024) // np.dtype(self.input_dtype_).itemsize),)
        try:
            triton_client.infer(model_name, inputs, client_timeout=0.1)
            assert False, "expected inference failure from deadline exceeded"
        except Exception as ex:
            if "Deadline Exceeded" not in ex.message():
                assert False
            return 1

CrashingScenario

Launches a separate crashing_client.py subprocess that exits mid-inference to test server resilience. Verifies the server remains live after the client crash.

SequenceScenario (Abstract Base)

Provides the core async streaming inference logic for all sequence-based scenarios. Manages sequence flags (start/end), expected result accumulation, timeout detection, and result validation.

class SequenceScenario(Scenario):
    @abc.abstractmethod
    def check_constraints(self, model_name, sequence_id):
        """Check if this scenario can run given previous sequence state."""
        pass

    def check_sequence_async(self, client_metadata, trial, model_name,
                              input_dtype, steps, timeout_ms, ...):
        """Execute a sequence of async streaming inferences and validate results."""
        triton_client.start_stream(partial(completion_callback, user_data))
        for flag_str, value, _, delay_ms in steps:
            triton_client.async_stream_infer(model_name, inputs,
                sequence_id=sequence_id, sequence_start=seq_start, sequence_end=seq_end)

Concrete Sequence Scenarios

Scenario Behavior Constraint Check
SequenceValidScenario Complete sequence with start and end flags Always runs
SequenceNoEndScenario Sequence with start flag only; server should abort and reclaim slot Always runs; tracks no-end state
SequenceValidNoEndScenario Two back-to-back sequences: first complete, second without end Always runs; tracks no-end state
SequenceValidValidScenario Two complete sequences back-to-back on same correlation ID Always runs
SequenceNoStartScenario Single request without start flag; expects server error Cannot follow a no-end sequence

All sequence scenarios use variable-length sequences drawn from a normal distribution (mean=16, stdev=8) and random integer values up to 1M.

Dependencies

  • tritonclient.grpc - gRPC Triton client for streaming inference
  • numpy - Random data generation and type handling
  • PIL (Pillow) - Image preprocessing for ResNet
  • test_util (tu) - Model name utilities
  • perf_analyzer - External performance analysis tool
  • crashing_client.py - External crashing client subprocess

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment