Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Neuml Txtai Workflow Init

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Workflow
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for composing ordered sequences of tasks into a deterministic processing workflow provided by the txtai library. The Workflow.__init__ constructor accepts a list of Task instances and configuration parameters that control batching, concurrency, naming, and stream pre-processing.

Description

The Workflow class is the base class for all workflows in txtai. Its constructor stores the provided task list and configuration, then computes the default worker count if one is not explicitly provided. The default is calculated as the maximum number of actions across all tasks (i.e., max(len(task.action) for task in self.tasks)), ensuring that multi-action tasks can fully utilize concurrent execution.

Once constructed, the Workflow is a callable object. Invoking it with an iterable of data elements triggers the full execution pipeline: creating an executor, running initializers, batching data, passing batches through each task sequentially, and yielding results.

Usage

Use Workflow.__init__ to compose task sequences into a single executable unit. Typical usage involves creating Task instances first (wrapping pipelines), then passing them as an ordered list to the Workflow constructor.

Code Reference

Source Location

  • Repository: txtai
  • File: src/python/txtai/workflow/base.py (lines 30-49)

Signature

class Workflow:
    def __init__(self, tasks, batch=100, workers=None, name=None, stream=None):

Import

from txtai.workflow import Workflow

I/O Contract

Inputs

Name Type Required Description
tasks list[Task] Yes Ordered list of Task instances defining the processing chain. Data flows through tasks sequentially -- the output of task N becomes the input of task N+1.
batch int No Number of data elements to process at a time. Defaults to 100. Controls memory usage and GPU batch efficiency.
workers int No Number of concurrent workers for the executor. Defaults to max(len(task.action) for task in tasks) -- the maximum number of actions in any single task.
name str No Workflow name for logging, scheduling, and programmatic reference via Application.
stream callable No Stream processor function applied to input elements before they enter the task chain. Receives the elements iterable and returns a (possibly transformed) iterable.

Outputs

Name Type Description
workflow Workflow A callable Workflow object. When called with workflow(elements), returns a generator that yields transformed data elements after passing them through all tasks.

Usage Examples

Basic Example

from txtai.pipeline import Textractor, Summary
from txtai.workflow import Task, Workflow

# Create pipelines
textractor = Textractor(sections=True)
summary = Summary()

# Wrap pipelines in tasks
task1 = Task(action=textractor)
task2 = Task(action=summary)

# Compose workflow with default batch size (100)
workflow = Workflow([task1, task2])

# Execute workflow
results = list(workflow(["report.pdf", "article.html"]))

Custom Batch Size and Workers

from txtai.pipeline import Translation
from txtai.workflow import Task, Workflow

# Create translation pipeline
translate = Translation()

# Wrap in a task
task = Task(action=translate)

# Compose workflow with small batch size and explicit worker count
workflow = Workflow([task], batch=10, workers=4, name="translation-workflow")

# Execute
results = list(workflow(["Bonjour le monde", "Hola mundo", "Hallo Welt"]))

Workflow with Stream Pre-processing

from txtai.pipeline import Summary
from txtai.workflow import Task, Workflow

summary = Summary()
task = Task(action=summary)

# Define a stream processor that filters short texts
def filter_short(elements):
    for element in elements:
        if len(element) > 100:
            yield element

# Compose workflow with stream pre-processor
workflow = Workflow([task], stream=filter_short, name="filtered-summary")

texts = ["Short.", "This is a much longer text that exceeds one hundred characters " * 3]
results = list(workflow(texts))

Multi-Task Chain

from txtai.pipeline import Textractor, Summary, Translation
from txtai.workflow import Task, Workflow

# Build a three-stage pipeline: extract -> summarize -> translate
textractor = Textractor(sections=True)
summary = Summary()
translate = Translation()

workflow = Workflow([
    Task(action=textractor),
    Task(action=summary),
    Task(action=translate),
], batch=50, name="extract-summarize-translate")

results = list(workflow(["document.pdf"]))

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment