Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Neuml Txtai Workflow Call

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Workflow
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for executing a composed workflow on input data provided by the txtai library. The Workflow.__call__ method creates an execution context, runs task initializers, processes batched data through all tasks sequentially, and yields results as a generator.

Description

Workflow.__call__ is the entry point for running a workflow. It orchestrates the full execution lifecycle:

  1. Creates an Execute context manager with the workflow's configured worker count, providing a thread/process pool for concurrent task actions.
  2. Calls self.initialize(), which iterates over all tasks and invokes their initialize hooks.
  3. Applies the stream callable to the input elements if one was configured.
  4. Chunks the elements into batches using self.chunk(elements), which handles both fixed-size inputs (via slicing) and generators (via iterative accumulation).
  5. For each batch, calls self.process(batch, executor), which runs the batch through every task in sequence and yields the transformed results.
  6. After all batches are processed, calls self.finalize() to invoke each task's finalize hooks.

The method is a generator function (uses yield from), meaning results are produced lazily as each batch completes processing.

Usage

Invoke the workflow by calling it with an iterable of data elements. Since the result is a generator, use list() to materialize all results, or iterate directly for memory-efficient streaming.

Code Reference

Source Location

  • Repository: txtai
  • File: src/python/txtai/workflow/base.py (lines 51-76)

Signature

def __call__(self, elements):

The full execution flow involves these supporting methods:

def initialize(self):
    """Runs task initializer methods before processing."""
    for task in self.tasks:
        if task.initialize:
            task.initialize()

def chunk(self, elements):
    """Splits elements into batches of self.batch size."""
    # Handles both sized inputs (slicing) and generators (accumulation)
    ...

def process(self, elements, executor):
    """Runs a batch through each task sequentially, yields results."""
    for x, task in enumerate(self.tasks):
        elements = task(elements, executor)
    yield from elements

def finalize(self):
    """Runs task finalizer methods after all data is processed."""
    for task in self.tasks:
        if task.finalize:
            task.finalize()

Import

from txtai.workflow import Workflow

I/O Contract

Inputs

Name Type Required Description
elements iterable Yes Input data to process. Can be a list of strings, file paths, dictionaries, tuples, or any iterable. Supports both fixed-size collections (lists) and generators for streaming input.

Outputs

Name Type Description
results generator A generator that yields transformed data elements. Each element has been processed through all tasks in the workflow sequentially. Materialize with list() or iterate directly.

Execution Flow

The following diagram illustrates the execution lifecycle:

workflow(elements)
    |
    +-- Execute(workers) context created
    |
    +-- initialize()
    |       +-- task[0].initialize()
    |       +-- task[1].initialize()
    |       +-- ...
    |
    +-- stream(elements)  [if stream configured]
    |
    +-- chunk(elements) -> batches
    |       +-- batch_0: elements[0:100]
    |       +-- batch_1: elements[100:200]
    |       +-- ...
    |
    +-- for each batch:
    |       +-- process(batch, executor)
    |               +-- task[0](batch, executor) -> result_0
    |               +-- task[1](result_0, executor) -> result_1
    |               +-- ...
    |               +-- yield from final_result
    |
    +-- finalize()
    |       +-- task[0].finalize()
    |       +-- task[1].finalize()
    |       +-- ...
    |
    +-- Execute context closed (cleanup)

Usage Examples

Basic Example

from txtai.pipeline import Textractor, Summary
from txtai.workflow import Task, Workflow

# Build a two-stage workflow
textractor = Textractor(sections=True)
summary = Summary()

workflow = Workflow([
    Task(action=textractor),
    Task(action=summary),
])

# Execute and collect results
results = list(workflow(["report.pdf", "paper.pdf"]))
print(results)

Generator-Based Streaming

from txtai.pipeline import Summary
from txtai.workflow import Task, Workflow

summary = Summary()
workflow = Workflow([Task(action=summary)], batch=10)

# Process a large dataset lazily
for result in workflow(open("large_corpus.txt")):
    print(result)

Workflow with Initializer and Finalizer

from txtai.pipeline import Textractor
from txtai.workflow import Task, Workflow

textractor = Textractor()

# Track processed count via closure
state = {"count": 0}

def on_init():
    state["count"] = 0
    print("Starting workflow...")

def on_done():
    print(f"Workflow complete. Processed {state['count']} items.")

task = Task(
    action=textractor,
    initialize=on_init,
    finalize=on_done,
)

workflow = Workflow([task])
results = list(workflow(["file1.pdf", "file2.pdf", "file3.pdf"]))

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment