Implementation:Neuml Txtai Task Init
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Workflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for wrapping pipeline callables into workflow-compatible task objects provided by the txtai library. The Task.__init__ constructor configures data routing, input handling, output merging, lifecycle hooks, and concurrency settings for one or more pipeline actions.
Description
The Task class is the base class for all workflow tasks in txtai. Its constructor accepts an action (or list of actions) alongside configuration parameters that control how data flows through the task during workflow execution. The constructor normalizes the action parameter into a list, stores all configuration, and optionally calls a register method for subclass-specific initialization.
When a Task instance is called (via __call__), it delegates to either filteredrun (for list inputs, applying the select filter) or run (for non-list iterables, processing all elements). The constructor configuration determines how the task behaves at each stage of this execution.
Usage
Use Task.__init__ whenever you need to wrap one or more pipeline callables for inclusion in a Workflow. Common scenarios include:
- Wrapping a single pipeline with default settings:
Task(action=my_pipeline) - Adding a regex filter:
Task(action=my_pipeline, select=r"\.pdf$") - Combining two pipelines with horizontal merge:
Task(action=[pipeline_a, pipeline_b], merge="hstack") - Adding lifecycle hooks:
Task(action=my_pipeline, initialize=setup_fn, finalize=cleanup_fn)
Code Reference
Source Location
- Repository: txtai
- File:
src/python/txtai/workflow/task/base.py(lines 21-91)
Signature
class Task:
def __init__(
self,
action=None,
select=None,
unpack=True,
column=None,
merge="hstack",
initialize=None,
finalize=None,
concurrency=None,
onetomany=True,
**kwargs,
):
Import
from txtai.workflow import Task
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| action | callable or list[callable] | No | Action(s) to execute on each data element. Typically pipeline instances. Normalized to a list internally. Defaults to an empty list (pass-through). |
| select | str (regex) | No | Regular expression filter. Only elements whose string value matches this pattern are processed; non-matching elements pass through unchanged. |
| unpack | bool | No | If True (default), extracts data payload from (id, data, tag) tuples before processing and re-packs afterward.
|
| column | int or dict | No | Column index to select if elements are tuples. Supports a single int or a dict mapping action index to column index. Defaults to None (use entire element). |
| merge | str | No | Merge mode for multi-action outputs. Options: "hstack" (default, column-wise tuples), "vstack" (row-wise one-to-many), "concat" (column-wise string join), or None (return raw list of outputs).
|
| initialize | callable | No | Function to execute before the workflow processes data. Called by the Workflow during its initialization phase. |
| finalize | callable | No | Function to execute after the workflow finishes processing all data. Called by the Workflow during its finalization phase. |
| concurrency | str | No | Concurrency method when an executor is available. Valid values: "thread" for thread-based, "process" for process-based parallelism.
|
| onetomany | bool | No | If True (default), list outputs from a single action are interpreted as one-to-many transformations, expanding the output stream. |
| **kwargs | dict | No | Additional keyword arguments. Passed to self.register() if the subclass defines it; otherwise raises TypeError.
|
Outputs
| Name | Type | Description |
|---|---|---|
| task | Task | A configured Task instance. When called via task(elements, executor), runs filteredrun (list input) or run (iterable input) and returns transformed data elements.
|
Usage Examples
Basic Example
from txtai.pipeline import Summary
from txtai.workflow import Task
# Create a summarization pipeline
summary = Summary()
# Wrap it in a task with default settings
task = Task(action=summary)
# Call the task directly
results = task(["This is a long document that needs summarizing..."])
Filtered Task with Select
from txtai.pipeline import Textractor
from txtai.workflow import Task
# Create a text extractor
textractor = Textractor(sections=True)
# Only process elements that look like file paths ending in .pdf
task = Task(action=textractor, select=r"\.pdf$")
# Mixed input: only PDF paths are processed, others pass through
results = task(["report.pdf", "plain text note", "analysis.pdf"])
Multi-Action Task with Merge
from txtai.pipeline import Summary, Translation
from txtai.workflow import Task
# Create two pipelines
summary = Summary()
translate = Translation()
# Combine both actions with horizontal merge
task = Task(action=[summary, translate], merge="hstack")
# Each element is processed by both actions; outputs are merged as tuples
results = task(["Un long article en francais..."])
# results = [("summarized text", "translated text")]
Task with Lifecycle Hooks
from txtai.workflow import Task
def setup():
print("Initializing resources...")
def teardown():
print("Cleaning up resources...")
task = Task(
action=lambda elements: [e.upper() for e in elements],
initialize=setup,
finalize=teardown,
)