Implementation:Neuml Txtai Workflow Call
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Workflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for executing a composed workflow on input data provided by the txtai library. The Workflow.__call__ method creates an execution context, runs task initializers, processes batched data through all tasks sequentially, and yields results as a generator.
Description
Workflow.__call__ is the entry point for running a workflow. It orchestrates the full execution lifecycle:
- Creates an
Executecontext manager with the workflow's configured worker count, providing a thread/process pool for concurrent task actions. - Calls
self.initialize(), which iterates over all tasks and invokes theirinitializehooks. - Applies the
streamcallable to the input elements if one was configured. - Chunks the elements into batches using
self.chunk(elements), which handles both fixed-size inputs (via slicing) and generators (via iterative accumulation). - For each batch, calls
self.process(batch, executor), which runs the batch through every task in sequence and yields the transformed results. - After all batches are processed, calls
self.finalize()to invoke each task's finalize hooks.
The method is a generator function (uses yield from), meaning results are produced lazily as each batch completes processing.
Usage
Invoke the workflow by calling it with an iterable of data elements. Since the result is a generator, use list() to materialize all results, or iterate directly for memory-efficient streaming.
Code Reference
Source Location
- Repository: txtai
- File:
src/python/txtai/workflow/base.py(lines 51-76)
Signature
def __call__(self, elements):
The full execution flow involves these supporting methods:
def initialize(self):
"""Runs task initializer methods before processing."""
for task in self.tasks:
if task.initialize:
task.initialize()
def chunk(self, elements):
"""Splits elements into batches of self.batch size."""
# Handles both sized inputs (slicing) and generators (accumulation)
...
def process(self, elements, executor):
"""Runs a batch through each task sequentially, yields results."""
for x, task in enumerate(self.tasks):
elements = task(elements, executor)
yield from elements
def finalize(self):
"""Runs task finalizer methods after all data is processed."""
for task in self.tasks:
if task.finalize:
task.finalize()
Import
from txtai.workflow import Workflow
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| elements | iterable | Yes | Input data to process. Can be a list of strings, file paths, dictionaries, tuples, or any iterable. Supports both fixed-size collections (lists) and generators for streaming input. |
Outputs
| Name | Type | Description |
|---|---|---|
| results | generator | A generator that yields transformed data elements. Each element has been processed through all tasks in the workflow sequentially. Materialize with list() or iterate directly.
|
Execution Flow
The following diagram illustrates the execution lifecycle:
workflow(elements)
|
+-- Execute(workers) context created
|
+-- initialize()
| +-- task[0].initialize()
| +-- task[1].initialize()
| +-- ...
|
+-- stream(elements) [if stream configured]
|
+-- chunk(elements) -> batches
| +-- batch_0: elements[0:100]
| +-- batch_1: elements[100:200]
| +-- ...
|
+-- for each batch:
| +-- process(batch, executor)
| +-- task[0](batch, executor) -> result_0
| +-- task[1](result_0, executor) -> result_1
| +-- ...
| +-- yield from final_result
|
+-- finalize()
| +-- task[0].finalize()
| +-- task[1].finalize()
| +-- ...
|
+-- Execute context closed (cleanup)
Usage Examples
Basic Example
from txtai.pipeline import Textractor, Summary
from txtai.workflow import Task, Workflow
# Build a two-stage workflow
textractor = Textractor(sections=True)
summary = Summary()
workflow = Workflow([
Task(action=textractor),
Task(action=summary),
])
# Execute and collect results
results = list(workflow(["report.pdf", "paper.pdf"]))
print(results)
Generator-Based Streaming
from txtai.pipeline import Summary
from txtai.workflow import Task, Workflow
summary = Summary()
workflow = Workflow([Task(action=summary)], batch=10)
# Process a large dataset lazily
for result in workflow(open("large_corpus.txt")):
print(result)
Workflow with Initializer and Finalizer
from txtai.pipeline import Textractor
from txtai.workflow import Task, Workflow
textractor = Textractor()
# Track processed count via closure
state = {"count": 0}
def on_init():
state["count"] = 0
print("Starting workflow...")
def on_done():
print(f"Workflow complete. Processed {state['count']} items.")
task = Task(
action=textractor,
initialize=on_init,
finalize=on_done,
)
workflow = Workflow([task])
results = list(workflow(["file1.pdf", "file2.pdf", "file3.pdf"]))