Implementation:Langchain ai Langgraph Task Decorator
| Attribute | Value |
|---|---|
| API | @task decorator and _TaskFunction class
|
| Workflow | Functional_API_Workflow |
| Type | API Doc |
| Repository | Langchain_ai_Langgraph |
| Source File | libs/langgraph/langgraph/func/__init__.py
|
| Source Lines | L46-90 (_TaskFunction), L93-112 (overloads), L115-217 (task function)
|
Overview
The @task decorator transforms a plain Python function (sync or async) into a _TaskFunction instance that, when called from within an @entrypoint or StateGraph context, returns a SyncAsyncFuture[T] instead of executing immediately. The decorator accepts optional name, retry_policy, and cache_policy parameters, and can be used with or without parentheses.
This page also covers the _TaskFunction class (lines 46-90) which wraps the original function, stores policy metadata, and delegates execution to the call() runtime function.
Description
The task function is implemented with three @overload signatures to support multiple calling conventions:
- Bare decorator:
@task-- the function is passed directly as__func_or_none__. - Parameterized decorator (returning sync):
@task(name="my_task")-- returns a decorator that wraps the function. - Parameterized decorator (returning async): Same as above but for async functions.
Internally, the task function normalizes the retry_policy parameter (which can be None, a single RetryPolicy, or a sequence of them) into a tuple, then constructs a _TaskFunction instance.
The _TaskFunction class:
- Stores the original function (
self.func), the retry policy sequence (self.retry_policy), and the cache policy (self.cache_policy). - Uses
functools.update_wrapperto preserve the original function's metadata (__name__,__doc__, etc.). - If a custom
nameis provided, it modifiesfunc.__name__on the function (or creates a partial for class methods to avoid mutating the shared method). - Implements
__call__which delegates to thecall()function fromlanggraph.pregel._call, passing through the retry and cache policies. - Provides
clear_cacheandaclear_cachemethods for programmatic cache invalidation.
Usage
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy, CachePolicy
# Bare decorator -- no parentheses
@task
def add_one(x: int) -> int:
return x + 1
# With parameters
@task(name="multiply", retry_policy=RetryPolicy(max_attempts=3))
def multiply(a: int, b: int) -> int:
return a * b
# With cache policy
@task(cache_policy=CachePolicy(ttl=300))
def expensive_computation(data: str) -> str:
return data.upper()
# Async task
@task
async def async_fetch(url: str) -> str:
return f"response from {url}"
# Called from an entrypoint
@entrypoint()
def workflow(inputs: dict) -> dict:
a = add_one(inputs["x"]) # Returns SyncAsyncFuture[int]
b = multiply(inputs["x"], 2) # Returns SyncAsyncFuture[int]
return {"a": a.result(), "b": b.result()}
Code Reference
Source Location
| File | libs/langgraph/langgraph/func/__init__.py
|
_TaskFunction class |
Lines 46-90 |
| Overloads | Lines 93-112 |
task function body |
Lines 115-217 |
Signature
# The task decorator (main implementation)
def task(
__func_or_none__: Callable[P, Awaitable[T]] | Callable[P, T] | None = None,
*,
name: str | None = None,
retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
cache_policy: CachePolicy[Callable[P, str | bytes]] | None = None,
) -> (
Callable[[Callable[P, Awaitable[T]] | Callable[P, T]], _TaskFunction[P, T]]
| _TaskFunction[P, T]
)
# The _TaskFunction class
class _TaskFunction(Generic[P, T]):
def __init__(
self,
func: Callable[P, Awaitable[T]] | Callable[P, T],
*,
retry_policy: Sequence[RetryPolicy],
cache_policy: CachePolicy[Callable[P, str | bytes]] | None = None,
name: str | None = None,
) -> None
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> SyncAsyncFuture[T]
def clear_cache(self, cache: BaseCache) -> None
async def aclear_cache(self, cache: BaseCache) -> None
Import
from langgraph.func import task
I/O Contract
Input (Decorator Parameters)
| Parameter | Type | Default | Description |
|---|---|---|---|
__func_or_none__ |
Callable[P, T] | None | None |
The function to decorate. When None, returns a decorator; when a callable, decorates it directly.
|
name |
None | None |
Optional custom name for the task. Defaults to the function's __name__.
|
retry_policy |
Sequence[RetryPolicy] | None | None |
Retry policy or sequence of policies. Normalized to a tuple internally. |
cache_policy |
bytes]] | None | None |
Cache policy for memoizing task results. |
Output
| Context | Return Type | Description |
|---|---|---|
Bare decorator (@task) |
_TaskFunction[P, T] |
Wraps the function directly. |
Parameterized (@task(...)) |
Callable[[Callable], _TaskFunction[P, T]] |
Returns a decorator that wraps the function. |
Calling the _TaskFunction |
SyncAsyncFuture[T] |
Returns a future representing the pending task result. |
Internal State (_TaskFunction)
| Attribute | Type | Description |
|---|---|---|
func |
Callable |
The original (possibly name-modified) function. |
retry_policy |
Sequence[RetryPolicy] |
Tuple of retry policies to apply on failure. |
cache_policy |
None | Cache policy for result memoization. |
Usage Examples
Bare Decorator
from langgraph.func import entrypoint, task
@task
def greet(name: str) -> str:
return f"Hello, {name}!"
@entrypoint()
def workflow(name: str) -> str:
future = greet(name)
return future.result()
workflow.invoke("Alice") # Returns "Hello, Alice!"
Task with Retry Policy
from langgraph.func import task
from langgraph.types import RetryPolicy
@task(retry_policy=RetryPolicy(max_attempts=5, initial_interval=1.0, backoff_factor=2.0))
def unreliable_api_call(endpoint: str) -> dict:
# May raise exceptions; will be retried up to 5 times
return {"status": "ok"}
Clearing Task Cache
from langgraph.func import task
from langgraph.types import CachePolicy
@task(cache_policy=CachePolicy(ttl=600))
def cached_lookup(key: str) -> str:
return f"value for {key}"
# Later, invalidate the cache
cached_lookup.clear_cache(my_cache_instance)