| Property |
Value
|
| Implementation Name |
WorkflowContext Request Info
|
| SDK |
Microsoft Agent Framework
|
| Repository |
Microsoft Agent Framework
|
| Source File |
python/packages/core/agent_framework/_workflows/_workflow_context.py
|
| Line Range |
L367-398
|
| Import |
Access via WorkflowContext parameter in @handler methods
|
| Type |
Async instance method
|
| Domains |
Workflow_Engine, Human_in_the_Loop
|
Overview
The WorkflowContext.request_info() method allows an executor to pause workflow execution and request information from an external system or human operator. It constructs a WorkflowEvent of type "request_info", registers it as a pending request in the RunnerContext, and emits it into the event stream. The workflow subsequently transitions to IDLE_WITH_PENDING_REQUESTS once all active executors complete, remaining in that state until the response is supplied externally.
Code Reference
Source Location
| Property |
Value
|
| File |
python/packages/core/agent_framework/_workflows/_workflow_context.py
|
| Class |
WorkflowContext
|
| Member |
request_info (async method)
|
| Lines |
367-398
|
Signature
async def request_info(
self,
request_data: object,
response_type: type,
*,
request_id: str | None = None,
) -> None:
Import Statement
# WorkflowContext is not imported directly; it is injected as a parameter
# into @handler methods on Executor subclasses.
from agent_framework import Executor, handler, WorkflowContext
class MyExecutor(Executor):
@handler
async def handle(self, msg: str, ctx: WorkflowContext) -> None:
await ctx.request_info(
request_data="Need user confirmation",
response_type=bool,
)
I/O Contract
Inputs
| Parameter |
Type |
Default |
Description
|
request_data |
object |
(required) |
The payload describing what information is being requested. This value is embedded in the emitted WorkflowEvent as event.data. Its type() is also recorded as request_type on the event for handler matching.
|
response_type |
type |
(required) |
The expected Python type of the response. Used to validate responses and to match against @response_handler signatures defined on the executor.
|
request_id |
None |
None |
Optional unique identifier for the request. If None, a UUID is auto-generated via str(uuid.uuid4()). Used for correlating external responses back to this specific request.
|
Output
| Return Type |
Description
|
None |
The method returns None. Its primary effects are side effects: emitting a request_info event and registering the request as pending.
|
Side Effects
| Effect |
Description
|
| Event emission |
A WorkflowEvent with type="request_info" is created via WorkflowEvent.request_info() and added to the event stream through RunnerContext.add_request_info_event(). The event carries request_id, source_executor_id, request_data (as data), request_type, and response_type.
|
| Pending request registration |
The RunnerContext stores the event in its _pending_request_info_events dictionary keyed by request_id. This enables response correlation when the workflow is resumed.
|
| State transition |
After all active executors complete, the workflow transitions to IDLE_WITH_PENDING_REQUESTS if unresolved requests remain.
|
Internal Behavior
The method proceeds through the following steps:
- Derive request type: Computes
request_type = type(request_data) from the runtime type of the provided data.
- Validate handler existence: Calls
self._executor.is_request_supported(request_type, response_type) to check whether the executor has a @response_handler defined for this (request_type, response_type) pair. If no matching handler is found, a warning is logged but the request is not suppressed.
- Construct the event: Calls the
WorkflowEvent.request_info() class method, passing the request_id (or a newly generated UUID), the source_executor_id, the request_data, and the response_type.
- Register and emit: Delegates to
RunnerContext.add_request_info_event(), which stores the event in the pending requests dictionary and emits it into the event stream.
Validation Warning
When no matching @response_handler is found, the following warning is logged:
logger.warning(
f"Executor '{self._executor_id}' requested info of type {request_type.__name__} "
f"with expected response type {response_type.__name__}, but no matching "
"response handler is defined. The request will not be ignored but responses will "
"not be processed. Please define a response handler using the @response_handler decorator."
)
This follows a fail-open with diagnostics approach: the request is still emitted so that external systems see it, but the developer is warned that responses will have nowhere to go.
Usage Examples
Basic Information Request
from agent_framework import Executor, handler, WorkflowContext
class NameCollector(Executor):
@handler
async def need_input(self, msg: str, ctx: WorkflowContext) -> None:
await ctx.request_info(
request_data="Please provide your name",
response_type=str,
request_id="user_name_request",
)
Request With Auto-Generated ID
from agent_framework import Executor, handler, WorkflowContext
class ConfirmationExecutor(Executor):
@handler
async def confirm(self, msg: str, ctx: WorkflowContext) -> None:
# request_id is omitted; a UUID will be generated automatically
await ctx.request_info(
request_data={"action": "delete", "target": "all_records"},
response_type=bool,
)
Detecting Request Events in a Stream
from agent_framework import Workflow
workflow = Workflow(...)
async for event in workflow.run("begin", stream=True):
if event.type == "request_info":
print(f"Executor '{event.source_executor_id}' requests info")
print(f" Request ID: {event.request_id}")
print(f" Data: {event.data}")
# Store request_id to supply a response later
Supplying Responses to Resume
# After collecting the response externally, resume the workflow
result = await workflow.run(
responses={"user_name_request": "Alice"},
)
Full Round-Trip Pattern
from agent_framework import Workflow
workflow = Workflow(...)
# Step 1: Start the workflow; it will pause when request_info is called
pending_requests = {}
async for event in workflow.run("start", stream=True):
if event.type == "request_info":
pending_requests[event.request_id] = event.data
# Step 2: The workflow is now IDLE_WITH_PENDING_REQUESTS
# Collect responses (e.g., from a UI or external system)
responses = {}
for req_id, req_data in pending_requests.items():
responses[req_id] = input(f"{req_data}: ")
# Step 3: Resume with the collected responses
result = await workflow.run(responses=responses)
Related Pages
Template:Sources