Implementation:AUTOMATIC1111 Stable diffusion webui Shared State
| Knowledge Sources | |
|---|---|
| Domains | Job Management, Progress Tracking |
| Last Updated | 2025-05-15 00:00 GMT |
Overview
Implements the State class that tracks the lifecycle of image generation jobs, including progress, interruption, live preview updates, and server command signaling.
Description
The State class is the central mechanism for managing generation job state across the web UI. It tracks whether a job has been skipped, interrupted, or stopped via boolean flags. It maintains job metadata such as job name, number, count, timestamp, and sampling step progress. The class provides begin() and end() methods to bracket job execution, resetting counters and timing. The skip(), interrupt(), and stop_generating() methods allow the UI or API to signal cancellation. The set_current_image() and do_set_current_image() methods manage live preview generation from current latents at configurable intervals. Server lifecycle commands (restart, stop) are handled through a thread-safe signaling mechanism using threading.Event, with server_command as a property that sets a signal when assigned, and wait_for_server_command() blocking until a command is issued. The dict() method serializes the current state for API responses. A backward-compatible need_restart property maps to the server_command system.
Usage
Use this class through shared.state to check or control the progress of image generation, to implement skip/interrupt behavior in samplers, or to manage server restart/stop commands from the UI.
Code Reference
Source Location
- Repository: AUTOMATIC1111_Stable_diffusion_webui
- File: modules/shared_state.py
- Lines: 1-168
Signature
class State:
skipped: bool
interrupted: bool
stopping_generation: bool
job: str
job_no: int
job_count: int
sampling_step: int
sampling_steps: int
current_latent: Tensor
current_image: Image
server_command: Optional[str]
def __init__(self)
def need_restart(self) -> bool # property
def wait_for_server_command(self, timeout: Optional[float] = None) -> Optional[str]
def request_restart(self) -> None
def skip(self) -> None
def interrupt(self) -> None
def stop_generating(self) -> None
def nextjob(self) -> None
def dict(self) -> dict
def begin(self, job: str = "(unknown)") -> None
def end(self) -> None
def set_current_image(self) -> None
def do_set_current_image(self) -> None
def assign_current_image(self, image) -> None
Import
from modules.shared_state import State
# Typically accessed via:
from modules import shared
state = shared.state
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job | str | No | Job name passed to begin(), defaults to "(unknown)". |
| timeout | float or None | No | Timeout in seconds for wait_for_server_command(). |
| image | PIL.Image | Yes | Image to assign as the current live preview in assign_current_image(). |
Outputs
| Name | Type | Description |
|---|---|---|
| dict() | dict | Serialized state containing skipped, interrupted, stopping_generation, job, job_count, job_timestamp, job_no, sampling_step, sampling_steps. |
| server_command | str or None | The server command received (e.g., "restart", "stop") from wait_for_server_command(). |
Usage Examples
from modules import shared
# Start a new generation job
shared.state.begin("txt2img")
shared.state.job_count = 4
for i in range(4):
if shared.state.interrupted:
break
if shared.state.skipped:
shared.state.skipped = False
continue
# ... perform generation ...
shared.state.nextjob()
shared.state.end()
# Check state from API
status = shared.state.dict()
print(f"Progress: step {status['sampling_step']}/{status['sampling_steps']}")