| Attribute |
Value
|
| API |
@entrypoint() class-based decorator
|
| Workflow |
Functional_API_Workflow
|
| Type |
API Doc
|
| Repository |
Langchain_ai_Langgraph
|
| Source File |
libs/langgraph/langgraph/func/__init__.py
|
| Source Lines |
L228-563 (class definition), L394-428 (__init__), L471-563 (__call__)
|
Overview
The entrypoint class is a decorator that converts a plain Python function into a fully compiled Pregel graph. It is the primary mechanism for defining workflows in LangGraph's functional API. The class accepts configuration for checkpointing, store, cache, context schema, and retry/cache policies via its __init__ method, and performs the actual function-to-graph conversion in its __call__ method.
The decorator supports both sync and async functions (though generators are explicitly rejected). The resulting Pregel instance exposes invoke(), stream(), ainvoke(), and astream() methods.
Description
The entrypoint class is implemented as a Generic[ContextT] class with two key methods:
__init__ (lines 394-428): Stores the decorator configuration:
- Normalizes deprecated
config_schema to context_schema.
- Normalizes deprecated
retry to retry_policy.
- Saves
checkpointer, store, cache, cache_policy, retry_policy, and context_schema as instance attributes.
__call__ (lines 471-563): Converts the decorated function into a Pregel graph:
- Rejects generator functions with
NotImplementedError.
- Wraps the function via
get_runnable_for_entrypoint(func) to produce a RunnableCallable.
- Inspects the function signature to extract the input type from the first parameter's annotation.
- Inspects the return annotation to detect
entrypoint.final[R, S] usage. If present, separates the output type (R) from the save type (S).
- Creates internal mapper functions (
_pluck_return_value and _pluck_save_value) that extract the appropriate value from an entrypoint.final return or pass through a plain return.
- Constructs a
Pregel instance with:
- A single
PregelNode bound to the wrapped function, triggered by START.
ChannelWrite entries for END (output) and PREVIOUS (checkpoint state).
- Channels:
EphemeralValue for START, LastValue for END and PREVIOUS.
- Stream mode set to
"updates" with stream_eager=True.
- All configuration (checkpointer, store, cache, policies, context schema) passed through.
Usage
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.types import RetryPolicy, CachePolicy
# Minimal entrypoint (no persistence)
@entrypoint()
def simple_workflow(x: int) -> int:
return x + 1
# With checkpointer for state persistence
@entrypoint(checkpointer=InMemorySaver())
def persistent_workflow(data: str, *, previous: str | None = None) -> str:
return data + (previous or "")
# Full configuration
@entrypoint(
checkpointer=InMemorySaver(),
store=InMemoryStore(),
retry_policy=RetryPolicy(max_attempts=3),
)
def robust_workflow(input_data: dict) -> dict:
return {"result": input_data["value"] * 2}
Code Reference
Source Location
| File |
libs/langgraph/langgraph/func/__init__.py
|
| Class |
entrypoint (lines 228-563)
|
__init__ |
Lines 394-428
|
__call__ |
Lines 471-563
|
final inner class |
Lines 430-469
|
Signature
class entrypoint(Generic[ContextT]):
def __init__(
self,
checkpointer: BaseCheckpointSaver | None = None,
store: BaseStore | None = None,
cache: BaseCache | None = None,
context_schema: type[ContextT] | None = None,
cache_policy: CachePolicy | None = None,
retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
) -> None
def __call__(self, func: Callable[..., Any]) -> Pregel
Import
from langgraph.func import entrypoint
I/O Contract
Input (Decorator Parameters)
| Parameter |
Type |
Default |
Description
|
checkpointer |
None |
None |
Checkpoint saver for persisting workflow state across invocations. Required for previous parameter and interrupt/resume support.
|
store |
None |
None |
Generalized key-value store for cross-session persistence. Some implementations support semantic search.
|
cache |
None |
None |
Cache backend for caching workflow and task results.
|
context_schema |
None |
None |
Schema for the context object passed to the workflow at runtime.
|
cache_policy |
None |
None |
Cache policy controlling key generation and TTL for the entire workflow.
|
retry_policy |
Sequence[RetryPolicy] | None |
None |
Retry policy or sequence of policies for the workflow itself.
|
Input (Decorated Function Requirements)
| Requirement |
Description
|
| Single positional parameter |
The function must have at least one parameter; the first parameter receives the workflow input.
|
| No generators |
Generator functions and async generator functions raise NotImplementedError.
|
| Optional injectable params |
previous, config, runtime can be declared as keyword-only parameters.
|
Output
| Output |
Type |
Description
|
| Decorated function replacement |
Pregel |
The original function is replaced by a compiled Pregel graph instance with invoke(), stream(), ainvoke(), and astream() methods.
|
Internal Pregel Graph Structure
| Component |
Value |
Description
|
| Node name |
func.__name__ |
Single node named after the decorated function.
|
| Trigger |
START |
Node triggers on the start channel.
|
| Input channel |
EphemeralValue(input_type) |
Single-use channel typed to the first parameter's annotation.
|
| Output channel |
LastValue(output_type, END) |
Holds the return value (or entrypoint.final.value).
|
| Previous channel |
LastValue(save_type, PREVIOUS) |
Holds the checkpointed state (or entrypoint.final.save).
|
| Stream mode |
"updates" |
Default stream mode for functional API workflows.
|
Usage Examples
Basic Workflow with Tasks
from langgraph.func import entrypoint, task
@task
def double(x: int) -> int:
return x * 2
@entrypoint()
def workflow(x: int) -> int:
return double(x).result()
result = workflow.invoke(5) # Returns 10
Workflow with Human-in-the-Loop Interrupt
from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
@task
def draft_email(topic: str) -> str:
return f"Draft email about {topic}"
@entrypoint(checkpointer=InMemorySaver())
def email_workflow(topic: str) -> dict:
draft = draft_email(topic).result()
approval = interrupt({"draft": draft, "question": "Approve?"})
return {"draft": draft, "approved": approval}
config = {"configurable": {"thread_id": "email-1"}}
# First run: generates draft, hits interrupt
for chunk in email_workflow.stream("quarterly report", config):
print(chunk)
# Resume with approval
for chunk in email_workflow.stream(Command(resume="yes"), config):
print(chunk)
Async Entrypoint
import asyncio
from langgraph.func import entrypoint, task
@task
async def async_process(data: str) -> str:
return data.upper()
@entrypoint()
async def async_workflow(items: list[str]) -> list[str]:
futures = [async_process(item) for item in items]
return await asyncio.gather(*futures)
result = await async_workflow.ainvoke(["hello", "world"])
# Returns ["HELLO", "WORLD"]
Related Pages