Implementation:PrefectHQ Prefect Concurrency Context Manager
Appearance
| Metadata | |
|---|---|
| Source | Repo: Prefect |
| Domains | Concurrency, Orchestration |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete context manager for acquiring and releasing Global Concurrency Limit slots provided by the Prefect library.
Description
The concurrency() context manager from prefect.concurrency.sync acquires named GCL slots before executing a code block and releases them after. It takes a limit name (string, typically including worker identity) and occupy count (number of slots to acquire).
Code Reference
- Repository: https://github.com/PrefectHQ/prefect
- File: src/prefect/concurrency/sync.py (L22) for the function, examples/per_worker_task_concurrency.py (L81-98) for usage
- Signature:
from prefect.concurrency.sync import concurrency
# Context manager usage:
with concurrency(names: str, occupy: int = 1):
# Code that uses the limited resource
...
- Import:
from prefect.concurrency.sync import concurrency
I/O Contract
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | names | str (required) | GCL name, e.g., "gpu:worker-1" |
| Input | occupy | int (optional, default 1) | Number of slots to acquire |
| Output | None | context manager | Resource access is scoped to the with block |
Usage Example
import os
from prefect import task, get_run_logger
from prefect.concurrency.sync import concurrency
def get_worker_id() -> str:
return os.getenv("WORKER_ID", "default")
@task
def run_ml_model(data: dict) -> dict:
logger = get_run_logger()
worker_id = get_worker_id()
# Only 1-2 ML tasks run at a time per worker
with concurrency(f"gpu:{worker_id}", occupy=1):
logger.info(f"Running ML inference on GPU...")
# ... model inference code ...
return {**data, "predictions": [0.9, 0.1]}
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment