Implementation:Langchain ai Langgraph SyncAsyncFuture Result
| Attribute | Value |
|---|---|
| API | _TaskFunction.__call__ and SyncAsyncFuture.result()
|
| Workflow | Functional_API_Workflow |
| Type | API Doc |
| Repository | Langchain_ai_Langgraph |
| Source File | libs/langgraph/langgraph/func/__init__.py (L71-78), libs/langgraph/langgraph/pregel/_call.py (L248-269)
|
| Source Lines | L71-78 (_TaskFunction.__call__), L248-251 (SyncAsyncFuture), L253-269 (call)
|
Overview
When a @task-decorated function is called, the _TaskFunction.__call__ method delegates to the call() function, which interacts with the LangGraph runtime to schedule task execution and return a SyncAsyncFuture[T]. This future is the primary interface for task composition: callers use .result() in sync contexts or await in async contexts to retrieve the computed value. The SyncAsyncFuture class extends concurrent.futures.Future[T] and implements __await__ for dual sync/async support.
Description
_TaskFunction.__call__
When a _TaskFunction instance is invoked (lines 71-78), it calls the call() function from langgraph.pregel._call, passing:
- The original function (
self.func) - The positional and keyword arguments from the caller
- The
retry_policysequence - The
cache_policy(if any)
The call() function (lines 253-269) then:
- Retrieves the current runtime configuration via
get_config(). - Extracts the runtime's
CONFIG_KEY_CALLimplementation from the configuration -- this is the actual task scheduler provided by the Pregel execution engine. - Invokes the scheduler with the function, packed arguments
(args, kwargs), retry policy, cache policy, and callbacks. - Returns the
SyncAsyncFutureproduced by the scheduler.
The scheduler implementation is injected by the Pregel runtime and is responsible for:
- Creating the future object.
- Scheduling the task for execution (potentially in parallel with other tasks).
- Resolving the future when the task completes.
- Handling retry and cache policy enforcement.
SyncAsyncFuture
The SyncAsyncFuture[T] class (lines 248-251) is defined as:
class SyncAsyncFuture(Generic[T], concurrent.futures.Future[T]):
def __await__(self) -> Generator[T, None, T]:
yield cast(T, ...)
It inherits all methods from concurrent.futures.Future, including:
.result(timeout=None)-- blocks until the task completes and returns the value, or raises the exception if the task failed..done()-- returnsTrueif the task has completed..exception(timeout=None)-- returns the exception raised by the task, if any.
The __await__ method enables the future to be used with await in async code, making it compatible with asyncio.gather() for parallel async task resolution.
The call() Function
The call() function (lines 253-269) is the bridge between the task decorator layer and the Pregel execution engine:
def call(
func: Callable[P, Awaitable[T]] | Callable[P, T],
*args: Any,
retry_policy: Sequence[RetryPolicy] | None = None,
cache_policy: CachePolicy | None = None,
**kwargs: Any,
) -> SyncAsyncFuture[T]:
config = get_config()
impl = config[CONF][CONFIG_KEY_CALL]
fut = impl(
func,
(args, kwargs),
retry_policy=retry_policy,
cache_policy=cache_policy,
callbacks=config["callbacks"],
)
return fut
The CONFIG_KEY_CALL is a runtime-injected callable that the Pregel engine places in the configuration. This indirection allows the same @task decorator to work across different execution contexts (local, distributed, testing) without modification.
Usage
from langgraph.func import entrypoint, task
@task
def compute(x: int) -> int:
return x * x
@entrypoint()
def workflow(numbers: list[int]) -> list[int]:
# Dispatch tasks -- returns futures, not values
futures = [compute(n) for n in numbers]
# Resolve futures -- blocks until each task completes
results = [f.result() for f in futures]
return results
workflow.invoke([1, 2, 3, 4]) # Returns [1, 4, 9, 16]
Code Reference
Source Location
| File | libs/langgraph/langgraph/func/__init__.py
|
_TaskFunction.__call__ |
Lines 71-78 |
| File | libs/langgraph/langgraph/pregel/_call.py
|
SyncAsyncFuture class |
Lines 248-251 |
call() function |
Lines 253-269 |
Signature
# _TaskFunction.__call__
class _TaskFunction(Generic[P, T]):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> SyncAsyncFuture[T]
# SyncAsyncFuture
class SyncAsyncFuture(Generic[T], concurrent.futures.Future[T]):
def __await__(self) -> Generator[T, None, T]: ...
# Inherited from concurrent.futures.Future:
def result(self, timeout: float | None = None) -> T: ...
def done(self) -> bool: ...
def exception(self, timeout: float | None = None) -> BaseException | None: ...
# call() function
def call(
func: Callable[P, Awaitable[T]] | Callable[P, T],
*args: Any,
retry_policy: Sequence[RetryPolicy] | None = None,
cache_policy: CachePolicy | None = None,
**kwargs: Any,
) -> SyncAsyncFuture[T]
Import
# SyncAsyncFuture and call are internal; users interact through @task-decorated functions
from langgraph.func import task # @task-decorated functions return SyncAsyncFuture when called
# For type annotations (advanced usage)
from langgraph.pregel._call import SyncAsyncFuture
I/O Contract
Input (_TaskFunction.__call__)
| Parameter | Type | Description |
|---|---|---|
*args |
P.args |
Positional arguments matching the original function's signature. |
**kwargs |
P.kwargs |
Keyword arguments matching the original function's signature. |
Output (_TaskFunction.__call__)
| Return Type | Description |
|---|---|
SyncAsyncFuture[T] |
A future representing the pending task result. Call .result() to block and get the value, or await in async contexts.
|
Input (call function)
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
Callable[P, T] | (required) | The task function to schedule for execution. |
*args |
Any |
Positional arguments to pass to the function. | |
retry_policy |
None | None |
Retry policies from the task decorator. |
cache_policy |
None | None |
Cache policy from the task decorator. |
**kwargs |
Any |
Keyword arguments to pass to the function. |
SyncAsyncFuture Methods
| Method | Return Type | Description |
|---|---|---|
.result(timeout=None) |
T |
Blocks until the task completes, then returns the value. Raises the task's exception if it failed. Optional timeout in seconds. |
.done() |
bool |
Returns True if the task has completed (successfully or with error).
|
.exception(timeout=None) |
None | Returns the exception if the task failed, or None if it succeeded.
|
await future |
T |
Async version of .result(), for use in async entrypoints and with asyncio.gather().
|
Usage Examples
Sequential Task Execution
from langgraph.func import entrypoint, task
@task
def step_one(x: int) -> int:
return x + 1
@task
def step_two(x: int) -> int:
return x * 2
@entrypoint()
def sequential(x: int) -> int:
a = step_one(x).result() # Wait for step_one
b = step_two(a).result() # step_two depends on step_one's result
return b
sequential.invoke(3) # Returns (3 + 1) * 2 = 8
Parallel Task Execution
from langgraph.func import entrypoint, task
@task
def process(item: str) -> str:
return item.upper()
@entrypoint()
def parallel_workflow(items: list[str]) -> list[str]:
# Dispatch all tasks before resolving any
futures = [process(item) for item in items]
# Resolve all -- tasks may have run in parallel
return [f.result() for f in futures]
parallel_workflow.invoke(["a", "b", "c"]) # Returns ["A", "B", "C"]
Async with asyncio.gather
import asyncio
from langgraph.func import entrypoint, task
@task
async def async_process(x: int) -> int:
return x ** 2
@entrypoint()
async def async_workflow(numbers: list[int]) -> list[int]:
futures = [async_process(n) for n in numbers]
return await asyncio.gather(*futures)
# Must use ainvoke for async entrypoints
result = await async_workflow.ainvoke([1, 2, 3]) # Returns [1, 4, 9]
Conditional Task Selection
from langgraph.func import entrypoint, task
@task
def fast_path(x: int) -> int:
return x
@task
def slow_path(x: int) -> int:
return x * x * x
@entrypoint()
def conditional(x: int) -> int:
if x > 10:
return fast_path(x).result()
else:
return slow_path(x).result()