Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langchain ai Langgraph Task Decorator

From Leeroopedia
Attribute Value
API @task decorator and _TaskFunction class
Workflow Functional_API_Workflow
Type API Doc
Repository Langchain_ai_Langgraph
Source File libs/langgraph/langgraph/func/__init__.py
Source Lines L46-90 (_TaskFunction), L93-112 (overloads), L115-217 (task function)

Overview

The @task decorator transforms a plain Python function (sync or async) into a _TaskFunction instance that, when called from within an @entrypoint or StateGraph context, returns a SyncAsyncFuture[T] instead of executing immediately. The decorator accepts optional name, retry_policy, and cache_policy parameters, and can be used with or without parentheses.

This page also covers the _TaskFunction class (lines 46-90) which wraps the original function, stores policy metadata, and delegates execution to the call() runtime function.

Description

The task function is implemented with three @overload signatures to support multiple calling conventions:

  1. Bare decorator: @task -- the function is passed directly as __func_or_none__.
  2. Parameterized decorator (returning sync): @task(name="my_task") -- returns a decorator that wraps the function.
  3. Parameterized decorator (returning async): Same as above but for async functions.

Internally, the task function normalizes the retry_policy parameter (which can be None, a single RetryPolicy, or a sequence of them) into a tuple, then constructs a _TaskFunction instance.

The _TaskFunction class:

  • Stores the original function (self.func), the retry policy sequence (self.retry_policy), and the cache policy (self.cache_policy).
  • Uses functools.update_wrapper to preserve the original function's metadata (__name__, __doc__, etc.).
  • If a custom name is provided, it modifies func.__name__ on the function (or creates a partial for class methods to avoid mutating the shared method).
  • Implements __call__ which delegates to the call() function from langgraph.pregel._call, passing through the retry and cache policies.
  • Provides clear_cache and aclear_cache methods for programmatic cache invalidation.

Usage

from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy, CachePolicy

# Bare decorator -- no parentheses
@task
def add_one(x: int) -> int:
    return x + 1

# With parameters
@task(name="multiply", retry_policy=RetryPolicy(max_attempts=3))
def multiply(a: int, b: int) -> int:
    return a * b

# With cache policy
@task(cache_policy=CachePolicy(ttl=300))
def expensive_computation(data: str) -> str:
    return data.upper()

# Async task
@task
async def async_fetch(url: str) -> str:
    return f"response from {url}"

# Called from an entrypoint
@entrypoint()
def workflow(inputs: dict) -> dict:
    a = add_one(inputs["x"])          # Returns SyncAsyncFuture[int]
    b = multiply(inputs["x"], 2)      # Returns SyncAsyncFuture[int]
    return {"a": a.result(), "b": b.result()}

Code Reference

Source Location

File libs/langgraph/langgraph/func/__init__.py
_TaskFunction class Lines 46-90
Overloads Lines 93-112
task function body Lines 115-217

Signature

# The task decorator (main implementation)
def task(
    __func_or_none__: Callable[P, Awaitable[T]] | Callable[P, T] | None = None,
    *,
    name: str | None = None,
    retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
    cache_policy: CachePolicy[Callable[P, str | bytes]] | None = None,
) -> (
    Callable[[Callable[P, Awaitable[T]] | Callable[P, T]], _TaskFunction[P, T]]
    | _TaskFunction[P, T]
)
# The _TaskFunction class
class _TaskFunction(Generic[P, T]):
    def __init__(
        self,
        func: Callable[P, Awaitable[T]] | Callable[P, T],
        *,
        retry_policy: Sequence[RetryPolicy],
        cache_policy: CachePolicy[Callable[P, str | bytes]] | None = None,
        name: str | None = None,
    ) -> None

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> SyncAsyncFuture[T]

    def clear_cache(self, cache: BaseCache) -> None

    async def aclear_cache(self, cache: BaseCache) -> None

Import

from langgraph.func import task

I/O Contract

Input (Decorator Parameters)

Parameter Type Default Description
__func_or_none__ Callable[P, T] | None None The function to decorate. When None, returns a decorator; when a callable, decorates it directly.
name None None Optional custom name for the task. Defaults to the function's __name__.
retry_policy Sequence[RetryPolicy] | None None Retry policy or sequence of policies. Normalized to a tuple internally.
cache_policy bytes]] | None None Cache policy for memoizing task results.

Output

Context Return Type Description
Bare decorator (@task) _TaskFunction[P, T] Wraps the function directly.
Parameterized (@task(...)) Callable[[Callable], _TaskFunction[P, T]] Returns a decorator that wraps the function.
Calling the _TaskFunction SyncAsyncFuture[T] Returns a future representing the pending task result.

Internal State (_TaskFunction)

Attribute Type Description
func Callable The original (possibly name-modified) function.
retry_policy Sequence[RetryPolicy] Tuple of retry policies to apply on failure.
cache_policy None Cache policy for result memoization.

Usage Examples

Bare Decorator

from langgraph.func import entrypoint, task

@task
def greet(name: str) -> str:
    return f"Hello, {name}!"

@entrypoint()
def workflow(name: str) -> str:
    future = greet(name)
    return future.result()

workflow.invoke("Alice")  # Returns "Hello, Alice!"

Task with Retry Policy

from langgraph.func import task
from langgraph.types import RetryPolicy

@task(retry_policy=RetryPolicy(max_attempts=5, initial_interval=1.0, backoff_factor=2.0))
def unreliable_api_call(endpoint: str) -> dict:
    # May raise exceptions; will be retried up to 5 times
    return {"status": "ok"}

Clearing Task Cache

from langgraph.func import task
from langgraph.types import CachePolicy

@task(cache_policy=CachePolicy(ttl=600))
def cached_lookup(key: str) -> str:
    return f"value for {key}"

# Later, invalidate the cache
cached_lookup.clear_cache(my_cache_instance)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment