Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Neuml Txtai Task Init

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Workflow
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for wrapping pipeline callables into workflow-compatible task objects provided by the txtai library. The Task.__init__ constructor configures data routing, input handling, output merging, lifecycle hooks, and concurrency settings for one or more pipeline actions.

Description

The Task class is the base class for all workflow tasks in txtai. Its constructor accepts an action (or list of actions) alongside configuration parameters that control how data flows through the task during workflow execution. The constructor normalizes the action parameter into a list, stores all configuration, and optionally calls a register method for subclass-specific initialization.

When a Task instance is called (via __call__), it delegates to either filteredrun (for list inputs, applying the select filter) or run (for non-list iterables, processing all elements). The constructor configuration determines how the task behaves at each stage of this execution.

Usage

Use Task.__init__ whenever you need to wrap one or more pipeline callables for inclusion in a Workflow. Common scenarios include:

  • Wrapping a single pipeline with default settings: Task(action=my_pipeline)
  • Adding a regex filter: Task(action=my_pipeline, select=r"\.pdf$")
  • Combining two pipelines with horizontal merge: Task(action=[pipeline_a, pipeline_b], merge="hstack")
  • Adding lifecycle hooks: Task(action=my_pipeline, initialize=setup_fn, finalize=cleanup_fn)

Code Reference

Source Location

  • Repository: txtai
  • File: src/python/txtai/workflow/task/base.py (lines 21-91)

Signature

class Task:
    def __init__(
        self,
        action=None,
        select=None,
        unpack=True,
        column=None,
        merge="hstack",
        initialize=None,
        finalize=None,
        concurrency=None,
        onetomany=True,
        **kwargs,
    ):

Import

from txtai.workflow import Task

I/O Contract

Inputs

Name Type Required Description
action callable or list[callable] No Action(s) to execute on each data element. Typically pipeline instances. Normalized to a list internally. Defaults to an empty list (pass-through).
select str (regex) No Regular expression filter. Only elements whose string value matches this pattern are processed; non-matching elements pass through unchanged.
unpack bool No If True (default), extracts data payload from (id, data, tag) tuples before processing and re-packs afterward.
column int or dict No Column index to select if elements are tuples. Supports a single int or a dict mapping action index to column index. Defaults to None (use entire element).
merge str No Merge mode for multi-action outputs. Options: "hstack" (default, column-wise tuples), "vstack" (row-wise one-to-many), "concat" (column-wise string join), or None (return raw list of outputs).
initialize callable No Function to execute before the workflow processes data. Called by the Workflow during its initialization phase.
finalize callable No Function to execute after the workflow finishes processing all data. Called by the Workflow during its finalization phase.
concurrency str No Concurrency method when an executor is available. Valid values: "thread" for thread-based, "process" for process-based parallelism.
onetomany bool No If True (default), list outputs from a single action are interpreted as one-to-many transformations, expanding the output stream.
**kwargs dict No Additional keyword arguments. Passed to self.register() if the subclass defines it; otherwise raises TypeError.

Outputs

Name Type Description
task Task A configured Task instance. When called via task(elements, executor), runs filteredrun (list input) or run (iterable input) and returns transformed data elements.

Usage Examples

Basic Example

from txtai.pipeline import Summary
from txtai.workflow import Task

# Create a summarization pipeline
summary = Summary()

# Wrap it in a task with default settings
task = Task(action=summary)

# Call the task directly
results = task(["This is a long document that needs summarizing..."])

Filtered Task with Select

from txtai.pipeline import Textractor
from txtai.workflow import Task

# Create a text extractor
textractor = Textractor(sections=True)

# Only process elements that look like file paths ending in .pdf
task = Task(action=textractor, select=r"\.pdf$")

# Mixed input: only PDF paths are processed, others pass through
results = task(["report.pdf", "plain text note", "analysis.pdf"])

Multi-Action Task with Merge

from txtai.pipeline import Summary, Translation
from txtai.workflow import Task

# Create two pipelines
summary = Summary()
translate = Translation()

# Combine both actions with horizontal merge
task = Task(action=[summary, translate], merge="hstack")

# Each element is processed by both actions; outputs are merged as tuples
results = task(["Un long article en francais..."])
# results = [("summarized text", "translated text")]

Task with Lifecycle Hooks

from txtai.workflow import Task

def setup():
    print("Initializing resources...")

def teardown():
    print("Cleaning up resources...")

task = Task(
    action=lambda elements: [e.upper() for e in elements],
    initialize=setup,
    finalize=teardown,
)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment