Implementation:Neuml Txtai Workflow Init
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Workflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for composing ordered sequences of tasks into a deterministic processing workflow provided by the txtai library. The Workflow.__init__ constructor accepts a list of Task instances and configuration parameters that control batching, concurrency, naming, and stream pre-processing.
Description
The Workflow class is the base class for all workflows in txtai. Its constructor stores the provided task list and configuration, then computes the default worker count if one is not explicitly provided. The default is calculated as the maximum number of actions across all tasks (i.e., max(len(task.action) for task in self.tasks)), ensuring that multi-action tasks can fully utilize concurrent execution.
Once constructed, the Workflow is a callable object. Invoking it with an iterable of data elements triggers the full execution pipeline: creating an executor, running initializers, batching data, passing batches through each task sequentially, and yielding results.
Usage
Use Workflow.__init__ to compose task sequences into a single executable unit. Typical usage involves creating Task instances first (wrapping pipelines), then passing them as an ordered list to the Workflow constructor.
Code Reference
Source Location
- Repository: txtai
- File:
src/python/txtai/workflow/base.py(lines 30-49)
Signature
class Workflow:
def __init__(self, tasks, batch=100, workers=None, name=None, stream=None):
Import
from txtai.workflow import Workflow
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tasks | list[Task] | Yes | Ordered list of Task instances defining the processing chain. Data flows through tasks sequentially -- the output of task N becomes the input of task N+1. |
| batch | int | No | Number of data elements to process at a time. Defaults to 100. Controls memory usage and GPU batch efficiency. |
| workers | int | No | Number of concurrent workers for the executor. Defaults to max(len(task.action) for task in tasks) -- the maximum number of actions in any single task.
|
| name | str | No | Workflow name for logging, scheduling, and programmatic reference via Application. |
| stream | callable | No | Stream processor function applied to input elements before they enter the task chain. Receives the elements iterable and returns a (possibly transformed) iterable. |
Outputs
| Name | Type | Description |
|---|---|---|
| workflow | Workflow | A callable Workflow object. When called with workflow(elements), returns a generator that yields transformed data elements after passing them through all tasks.
|
Usage Examples
Basic Example
from txtai.pipeline import Textractor, Summary
from txtai.workflow import Task, Workflow
# Create pipelines
textractor = Textractor(sections=True)
summary = Summary()
# Wrap pipelines in tasks
task1 = Task(action=textractor)
task2 = Task(action=summary)
# Compose workflow with default batch size (100)
workflow = Workflow([task1, task2])
# Execute workflow
results = list(workflow(["report.pdf", "article.html"]))
Custom Batch Size and Workers
from txtai.pipeline import Translation
from txtai.workflow import Task, Workflow
# Create translation pipeline
translate = Translation()
# Wrap in a task
task = Task(action=translate)
# Compose workflow with small batch size and explicit worker count
workflow = Workflow([task], batch=10, workers=4, name="translation-workflow")
# Execute
results = list(workflow(["Bonjour le monde", "Hola mundo", "Hallo Welt"]))
Workflow with Stream Pre-processing
from txtai.pipeline import Summary
from txtai.workflow import Task, Workflow
summary = Summary()
task = Task(action=summary)
# Define a stream processor that filters short texts
def filter_short(elements):
for element in elements:
if len(element) > 100:
yield element
# Compose workflow with stream pre-processor
workflow = Workflow([task], stream=filter_short, name="filtered-summary")
texts = ["Short.", "This is a much longer text that exceeds one hundred characters " * 3]
results = list(workflow(texts))
Multi-Task Chain
from txtai.pipeline import Textractor, Summary, Translation
from txtai.workflow import Task, Workflow
# Build a three-stage pipeline: extract -> summarize -> translate
textractor = Textractor(sections=True)
summary = Summary()
translate = Translation()
workflow = Workflow([
Task(action=textractor),
Task(action=summary),
Task(action=translate),
], batch=50, name="extract-summarize-translate")
results = list(workflow(["document.pdf"]))