Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph SyncAsyncFuture Result

From Leeroopedia
Attribute Value
API _TaskFunction.__call__ and SyncAsyncFuture.result()
Workflow Functional_API_Workflow
Type API Doc
Repository Langchain_ai_Langgraph
Source File libs/langgraph/langgraph/func/__init__.py (L71-78), libs/langgraph/langgraph/pregel/_call.py (L248-269)
Source Lines L71-78 (_TaskFunction.__call__), L248-251 (SyncAsyncFuture), L253-269 (call)

Overview

When a @task-decorated function is called, the _TaskFunction.__call__ method delegates to the call() function, which interacts with the LangGraph runtime to schedule task execution and return a SyncAsyncFuture[T]. This future is the primary interface for task composition: callers use .result() in sync contexts or await in async contexts to retrieve the computed value. The SyncAsyncFuture class extends concurrent.futures.Future[T] and implements __await__ for dual sync/async support.

Description

_TaskFunction.__call__

When a _TaskFunction instance is invoked (lines 71-78), it calls the call() function from langgraph.pregel._call, passing:

  • The original function (self.func)
  • The positional and keyword arguments from the caller
  • The retry_policy sequence
  • The cache_policy (if any)

The call() function (lines 253-269) then:

  1. Retrieves the current runtime configuration via get_config().
  2. Extracts the runtime's CONFIG_KEY_CALL implementation from the configuration -- this is the actual task scheduler provided by the Pregel execution engine.
  3. Invokes the scheduler with the function, packed arguments (args, kwargs), retry policy, cache policy, and callbacks.
  4. Returns the SyncAsyncFuture produced by the scheduler.

The scheduler implementation is injected by the Pregel runtime and is responsible for:

  • Creating the future object.
  • Scheduling the task for execution (potentially in parallel with other tasks).
  • Resolving the future when the task completes.
  • Handling retry and cache policy enforcement.

SyncAsyncFuture

The SyncAsyncFuture[T] class (lines 248-251) is defined as:

class SyncAsyncFuture(Generic[T], concurrent.futures.Future[T]):
    def __await__(self) -> Generator[T, None, T]:
        yield cast(T, ...)

It inherits all methods from concurrent.futures.Future, including:

  • .result(timeout=None) -- blocks until the task completes and returns the value, or raises the exception if the task failed.
  • .done() -- returns True if the task has completed.
  • .exception(timeout=None) -- returns the exception raised by the task, if any.

The __await__ method enables the future to be used with await in async code, making it compatible with asyncio.gather() for parallel async task resolution.

The call() Function

The call() function (lines 253-269) is the bridge between the task decorator layer and the Pregel execution engine:

def call(
    func: Callable[P, Awaitable[T]] | Callable[P, T],
    *args: Any,
    retry_policy: Sequence[RetryPolicy] | None = None,
    cache_policy: CachePolicy | None = None,
    **kwargs: Any,
) -> SyncAsyncFuture[T]:
    config = get_config()
    impl = config[CONF][CONFIG_KEY_CALL]
    fut = impl(
        func,
        (args, kwargs),
        retry_policy=retry_policy,
        cache_policy=cache_policy,
        callbacks=config["callbacks"],
    )
    return fut

The CONFIG_KEY_CALL is a runtime-injected callable that the Pregel engine places in the configuration. This indirection allows the same @task decorator to work across different execution contexts (local, distributed, testing) without modification.

Usage

from langgraph.func import entrypoint, task

@task
def compute(x: int) -> int:
    return x * x

@entrypoint()
def workflow(numbers: list[int]) -> list[int]:
    # Dispatch tasks -- returns futures, not values
    futures = [compute(n) for n in numbers]

    # Resolve futures -- blocks until each task completes
    results = [f.result() for f in futures]
    return results

workflow.invoke([1, 2, 3, 4])  # Returns [1, 4, 9, 16]

Code Reference

Source Location

File libs/langgraph/langgraph/func/__init__.py
_TaskFunction.__call__ Lines 71-78
File libs/langgraph/langgraph/pregel/_call.py
SyncAsyncFuture class Lines 248-251
call() function Lines 253-269

Signature

# _TaskFunction.__call__
class _TaskFunction(Generic[P, T]):
    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> SyncAsyncFuture[T]
# SyncAsyncFuture
class SyncAsyncFuture(Generic[T], concurrent.futures.Future[T]):
    def __await__(self) -> Generator[T, None, T]: ...

    # Inherited from concurrent.futures.Future:
    def result(self, timeout: float | None = None) -> T: ...
    def done(self) -> bool: ...
    def exception(self, timeout: float | None = None) -> BaseException | None: ...
# call() function
def call(
    func: Callable[P, Awaitable[T]] | Callable[P, T],
    *args: Any,
    retry_policy: Sequence[RetryPolicy] | None = None,
    cache_policy: CachePolicy | None = None,
    **kwargs: Any,
) -> SyncAsyncFuture[T]

Import

# SyncAsyncFuture and call are internal; users interact through @task-decorated functions
from langgraph.func import task  # @task-decorated functions return SyncAsyncFuture when called

# For type annotations (advanced usage)
from langgraph.pregel._call import SyncAsyncFuture

I/O Contract

Input (_TaskFunction.__call__)

Parameter Type Description
*args P.args Positional arguments matching the original function's signature.
**kwargs P.kwargs Keyword arguments matching the original function's signature.

Output (_TaskFunction.__call__)

Return Type Description
SyncAsyncFuture[T] A future representing the pending task result. Call .result() to block and get the value, or await in async contexts.

Input (call function)

Parameter Type Default Description
func Callable[P, T] (required) The task function to schedule for execution.
*args Any Positional arguments to pass to the function.
retry_policy None None Retry policies from the task decorator.
cache_policy None None Cache policy from the task decorator.
**kwargs Any Keyword arguments to pass to the function.

SyncAsyncFuture Methods

Method Return Type Description
.result(timeout=None) T Blocks until the task completes, then returns the value. Raises the task's exception if it failed. Optional timeout in seconds.
.done() bool Returns True if the task has completed (successfully or with error).
.exception(timeout=None) None Returns the exception if the task failed, or None if it succeeded.
await future T Async version of .result(), for use in async entrypoints and with asyncio.gather().

Usage Examples

Sequential Task Execution

from langgraph.func import entrypoint, task

@task
def step_one(x: int) -> int:
    return x + 1

@task
def step_two(x: int) -> int:
    return x * 2

@entrypoint()
def sequential(x: int) -> int:
    a = step_one(x).result()   # Wait for step_one
    b = step_two(a).result()   # step_two depends on step_one's result
    return b

sequential.invoke(3)  # Returns (3 + 1) * 2 = 8

Parallel Task Execution

from langgraph.func import entrypoint, task

@task
def process(item: str) -> str:
    return item.upper()

@entrypoint()
def parallel_workflow(items: list[str]) -> list[str]:
    # Dispatch all tasks before resolving any
    futures = [process(item) for item in items]
    # Resolve all -- tasks may have run in parallel
    return [f.result() for f in futures]

parallel_workflow.invoke(["a", "b", "c"])  # Returns ["A", "B", "C"]

Async with asyncio.gather

import asyncio
from langgraph.func import entrypoint, task

@task
async def async_process(x: int) -> int:
    return x ** 2

@entrypoint()
async def async_workflow(numbers: list[int]) -> list[int]:
    futures = [async_process(n) for n in numbers]
    return await asyncio.gather(*futures)

# Must use ainvoke for async entrypoints
result = await async_workflow.ainvoke([1, 2, 3])  # Returns [1, 4, 9]

Conditional Task Selection

from langgraph.func import entrypoint, task

@task
def fast_path(x: int) -> int:
    return x

@task
def slow_path(x: int) -> int:
    return x * x * x

@entrypoint()
def conditional(x: int) -> int:
    if x > 10:
        return fast_path(x).result()
    else:
        return slow_path(x).result()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment