Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langchain ai Langgraph Entrypoint Decorator

From Leeroopedia
Attribute Value
API @entrypoint() class-based decorator
Workflow Functional_API_Workflow
Type API Doc
Repository Langchain_ai_Langgraph
Source File libs/langgraph/langgraph/func/__init__.py
Source Lines L228-563 (class definition), L394-428 (__init__), L471-563 (__call__)

Overview

The entrypoint class is a decorator that converts a plain Python function into a fully compiled Pregel graph. It is the primary mechanism for defining workflows in LangGraph's functional API. The class accepts configuration for checkpointing, store, cache, context schema, and retry/cache policies via its __init__ method, and performs the actual function-to-graph conversion in its __call__ method.

The decorator supports both sync and async functions (though generators are explicitly rejected). The resulting Pregel instance exposes invoke(), stream(), ainvoke(), and astream() methods.

Description

The entrypoint class is implemented as a Generic[ContextT] class with two key methods:

__init__ (lines 394-428): Stores the decorator configuration:

  • Normalizes deprecated config_schema to context_schema.
  • Normalizes deprecated retry to retry_policy.
  • Saves checkpointer, store, cache, cache_policy, retry_policy, and context_schema as instance attributes.

__call__ (lines 471-563): Converts the decorated function into a Pregel graph:

  1. Rejects generator functions with NotImplementedError.
  2. Wraps the function via get_runnable_for_entrypoint(func) to produce a RunnableCallable.
  3. Inspects the function signature to extract the input type from the first parameter's annotation.
  4. Inspects the return annotation to detect entrypoint.final[R, S] usage. If present, separates the output type (R) from the save type (S).
  5. Creates internal mapper functions (_pluck_return_value and _pluck_save_value) that extract the appropriate value from an entrypoint.final return or pass through a plain return.
  6. Constructs a Pregel instance with:
    • A single PregelNode bound to the wrapped function, triggered by START.
    • ChannelWrite entries for END (output) and PREVIOUS (checkpoint state).
    • Channels: EphemeralValue for START, LastValue for END and PREVIOUS.
    • Stream mode set to "updates" with stream_eager=True.
    • All configuration (checkpointer, store, cache, policies, context schema) passed through.

Usage

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.types import RetryPolicy, CachePolicy

# Minimal entrypoint (no persistence)
@entrypoint()
def simple_workflow(x: int) -> int:
    return x + 1

# With checkpointer for state persistence
@entrypoint(checkpointer=InMemorySaver())
def persistent_workflow(data: str, *, previous: str | None = None) -> str:
    return data + (previous or "")

# Full configuration
@entrypoint(
    checkpointer=InMemorySaver(),
    store=InMemoryStore(),
    retry_policy=RetryPolicy(max_attempts=3),
)
def robust_workflow(input_data: dict) -> dict:
    return {"result": input_data["value"] * 2}

Code Reference

Source Location

File libs/langgraph/langgraph/func/__init__.py
Class entrypoint (lines 228-563)
__init__ Lines 394-428
__call__ Lines 471-563
final inner class Lines 430-469

Signature

class entrypoint(Generic[ContextT]):
    def __init__(
        self,
        checkpointer: BaseCheckpointSaver | None = None,
        store: BaseStore | None = None,
        cache: BaseCache | None = None,
        context_schema: type[ContextT] | None = None,
        cache_policy: CachePolicy | None = None,
        retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
    ) -> None

    def __call__(self, func: Callable[..., Any]) -> Pregel

Import

from langgraph.func import entrypoint

I/O Contract

Input (Decorator Parameters)

Parameter Type Default Description
checkpointer None None Checkpoint saver for persisting workflow state across invocations. Required for previous parameter and interrupt/resume support.
store None None Generalized key-value store for cross-session persistence. Some implementations support semantic search.
cache None None Cache backend for caching workflow and task results.
context_schema None None Schema for the context object passed to the workflow at runtime.
cache_policy None None Cache policy controlling key generation and TTL for the entire workflow.
retry_policy Sequence[RetryPolicy] | None None Retry policy or sequence of policies for the workflow itself.

Input (Decorated Function Requirements)

Requirement Description
Single positional parameter The function must have at least one parameter; the first parameter receives the workflow input.
No generators Generator functions and async generator functions raise NotImplementedError.
Optional injectable params previous, config, runtime can be declared as keyword-only parameters.

Output

Output Type Description
Decorated function replacement Pregel The original function is replaced by a compiled Pregel graph instance with invoke(), stream(), ainvoke(), and astream() methods.

Internal Pregel Graph Structure

Component Value Description
Node name func.__name__ Single node named after the decorated function.
Trigger START Node triggers on the start channel.
Input channel EphemeralValue(input_type) Single-use channel typed to the first parameter's annotation.
Output channel LastValue(output_type, END) Holds the return value (or entrypoint.final.value).
Previous channel LastValue(save_type, PREVIOUS) Holds the checkpointed state (or entrypoint.final.save).
Stream mode "updates" Default stream mode for functional API workflows.

Usage Examples

Basic Workflow with Tasks

from langgraph.func import entrypoint, task

@task
def double(x: int) -> int:
    return x * 2

@entrypoint()
def workflow(x: int) -> int:
    return double(x).result()

result = workflow.invoke(5)  # Returns 10

Workflow with Human-in-the-Loop Interrupt

from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

@task
def draft_email(topic: str) -> str:
    return f"Draft email about {topic}"

@entrypoint(checkpointer=InMemorySaver())
def email_workflow(topic: str) -> dict:
    draft = draft_email(topic).result()
    approval = interrupt({"draft": draft, "question": "Approve?"})
    return {"draft": draft, "approved": approval}

config = {"configurable": {"thread_id": "email-1"}}

# First run: generates draft, hits interrupt
for chunk in email_workflow.stream("quarterly report", config):
    print(chunk)

# Resume with approval
for chunk in email_workflow.stream(Command(resume="yes"), config):
    print(chunk)

Async Entrypoint

import asyncio
from langgraph.func import entrypoint, task

@task
async def async_process(data: str) -> str:
    return data.upper()

@entrypoint()
async def async_workflow(items: list[str]) -> list[str]:
    futures = [async_process(item) for item in items]
    return await asyncio.gather(*futures)

result = await async_workflow.ainvoke(["hello", "world"])
# Returns ["HELLO", "WORLD"]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment