Principle:Langchain ai Langgraph Task Composition
| Attribute | Value |
|---|---|
| Concept | Composing multiple tasks using standard Python control flow and futures |
| Workflow | Functional_API_Workflow |
| Type | Principle |
| Repository | Langchain_ai_Langgraph |
| Source | libs/langgraph/langgraph/func/__init__.py, libs/langgraph/langgraph/pregel/_call.py
|
Overview
Task composition in LangGraph's functional API is the mechanism by which multiple @task-decorated functions are combined within an @entrypoint to form a complete workflow. Rather than defining a static graph of nodes and edges, the developer uses ordinary Python constructs -- loops, conditionals, list comprehensions, and function calls -- to orchestrate tasks. The runtime infers the dependency structure from the call pattern and the points where futures are resolved.
This approach turns implicit data dependencies into an execution DAG (directed acyclic graph). When a task is called, it returns a SyncAsyncFuture. When .result() is called on that future, it creates a synchronization point. Tasks whose futures have not yet been resolved can execute in parallel, while .result() calls enforce ordering.
Description
Future-Based Concurrency
The SyncAsyncFuture[T] class extends Python's concurrent.futures.Future[T] and also implements __await__, making it usable in both sync and async contexts. When a @task-decorated function is called:
- The call is registered with the LangGraph runtime via the
call()function. - A
SyncAsyncFuture[T]is returned immediately to the caller. - The runtime schedules the task for execution.
- The caller can continue dispatching additional tasks before blocking.
- Calling
.result()(sync) orawait-ing (async) blocks until the task completes.
This means that the following pattern naturally creates parallel execution:
# These three tasks can run in parallel
f1 = task_a(x)
f2 = task_b(y)
f3 = task_c(z)
# This blocks until all three are done
results = [f1.result(), f2.result(), f3.result()]
Task DAG Construction
The dependency graph between tasks is determined implicitly by when futures are resolved:
- Independent tasks: Tasks whose inputs do not depend on other tasks' outputs can be dispatched simultaneously. Their futures exist concurrently and are resolved together at a later synchronization point.
- Sequential tasks: When one task's input is the
.result()of another, the second task cannot be dispatched until the first completes. This creates an ordering constraint.
- Fan-out / fan-in: A common pattern is to dispatch many tasks in a loop (fan-out), then collect all results (fan-in). The runtime can execute all fanned-out tasks concurrently.
@entrypoint()
def pipeline(data: list[str]) -> list[str]:
# Fan-out: dispatch tasks in parallel
step1_futures = [preprocess(d) for d in data]
# Fan-in: wait for all preprocessing to complete
step1_results = [f.result() for f in step1_futures]
# Sequential dependency: step 2 depends on step 1
step2_futures = [transform(r) for r in step1_results]
return [f.result() for f in step2_futures]
Parallel Execution
Parallel execution in the functional API is implicit rather than explicit. The developer does not need to use threading, asyncio.gather, or any concurrency primitives for sync tasks. The runtime detects that multiple futures are outstanding and schedules their execution concurrently. For async tasks, asyncio.gather(*futures) can be used to resolve multiple futures concurrently in the async event loop.
The key insight is that parallelism is a property of the call pattern, not of explicit concurrency annotations. If three tasks are dispatched before any results are consumed, the runtime can execute all three in parallel.
Result Gathering
Results are gathered by calling .result() on each future. This is a blocking operation in sync contexts. Common gathering patterns include:
- List comprehension:
[f.result() for f in futures]-- resolves futures in order. - Direct resolution:
value = my_task(input).result()-- dispatches and immediately waits. - Conditional gathering: Resolving different futures based on runtime conditions.
Each .result() call is also a checkpoint boundary: if a checkpointer is active, the resolved value is persisted, and on resume, the task will not re-execute.
Standard Python Control Flow
Because task composition uses ordinary Python, developers can employ the full language:
- Conditionals:
if condition: result = task_a(x).result() else: result = task_b(x).result() - Loops:
for item in collection: futures.append(process(item)) - Exception handling:
try: result = risky_task(x).result() except: ... - Early return:
if cached: return cached_value
This is a significant ergonomic advantage over the graph-based API, where conditional routing requires explicit add_conditional_edges calls.
Usage
from langgraph.func import entrypoint, task
@task
def fetch_data(source: str) -> dict:
return {"source": source, "data": f"data from {source}"}
@task
def process_data(data: dict) -> str:
return f"processed: {data['data']}"
@task
def summarize(results: list[str]) -> str:
return f"Summary of {len(results)} results"
@entrypoint()
def data_pipeline(sources: list[str]) -> str:
# Fan-out: fetch from all sources in parallel
fetch_futures = [fetch_data(s) for s in sources]
raw_data = [f.result() for f in fetch_futures]
# Fan-out again: process all data in parallel
process_futures = [process_data(d) for d in raw_data]
processed = [f.result() for f in process_futures]
# Sequential: summarize depends on all processing
return summarize(processed).result()
result = data_pipeline.invoke(["api", "database", "file"])
Theoretical Basis
Task composition in the functional API draws from several established paradigms:
- Dataflow Programming: In dataflow models, computations are expressed as a graph of operations connected by data channels. The functional API achieves this implicitly: each task call creates a node, and
.result()calls create data edges. This is similar to how systems like Dask, Ray, and Apache Beam allow Python functions to be composed into parallel execution graphs.
- Futures and Promises: The future-based composition model originates from Baker and Hewitt (1977). By returning futures instead of values, the API enables speculative execution -- tasks are dispatched optimistically, and synchronization is deferred until results are actually needed. This maximizes the window for parallel execution.
- Fork-Join Parallelism: The fan-out/fan-in pattern is a classic instance of fork-join parallelism. The "fork" phase dispatches independent tasks, and the "join" phase collects their results. This pattern is the foundation of frameworks like Java's ForkJoinPool and Cilk's spawn/sync.
- Structured Concurrency: The functional API provides a form of structured concurrency where the lifetime of concurrent tasks is bounded by the entrypoint function's scope. All tasks dispatched within an entrypoint are guaranteed to complete (or fail) before the entrypoint returns, preventing orphaned background work.
- Implicit Parallelism: Unlike explicit parallelism (where the developer manages threads or coroutines), the functional API's parallelism is implicit -- it emerges from the data dependency structure. This is similar to how functional programming languages can automatically parallelize pure functions whose results do not depend on each other.