Principle:Neuml Txtai Task Creation
Principle: Task_Creation
| Metadata | |
|---|---|
| Sources | txtai, txtai Documentation |
| Domains | Workflow_Orchestration, Data_Processing |
| Last Updated | 2026-02-10 12:00 GMT |
Overview
Wrapping pipelines into tasks that filter, transform, and route data within workflows, providing an abstraction layer between raw AI pipelines and the workflow execution engine.
Description
A task in txtai is a wrapper that mediates between the workflow engine and one or more pipeline actions. While pipelines define what transformation to apply, tasks define how that transformation integrates into a data flow: which elements to process, how to prepare inputs, how to handle multi-action outputs, and how to merge results back into the element stream.
The base Task class provides the following capabilities:
- Data filtering (select): A regex-based filter that determines which data elements the task should process. Elements that do not match the filter pass through unmodified, enabling selective processing within a heterogeneous data stream.
- Data unpacking (unpack): Support for structured data elements in the form of
(id, data, tag)tuples. When unpacking is enabled, the task extracts the data component for processing and re-packs the result into the original tuple structure.
- Column selection (column): For tuple-valued elements, the ability to select a specific column index as input to the action, enabling tasks to operate on individual fields of composite data.
- Action composition: A task can hold multiple actions (callables). When multiple actions are present, each action processes the same input independently, and the results are merged using one of several strategies.
- Merge strategies (merge): Three modes for combining outputs from multiple actions:
- "hstack" (default): Column-wise merge producing tuples of results --
[(a1, a2), (b1, b2), ...] - "vstack": Row-wise merge producing one-to-many transformations --
[a1, a2, b1, b2, ...] - "concat": Column-wise merge with string concatenation --
["a1. a2", "b1. b2", ...]
- "hstack" (default): Column-wise merge producing tuples of results --
- Lifecycle hooks (initialize/finalize): Callable hooks executed before and after the task processes any data, enabling resource setup and teardown.
- Concurrency control: Tasks can specify whether their actions should be executed using thread-based or process-based concurrency when an executor is available.
- One-to-many transformations: When a single input element produces multiple output elements, the task uses the
OneToManywrapper to signal the workflow engine to expand the element stream.
Specialized Task Types
Two built-in subclasses extend the base task with content-type filtering:
- UrlTask: Accepts only elements that match a URL pattern (
\w+:\/\/). Useful for routing web content to URL-specific pipelines.
- FileTask: Accepts only elements that correspond to existing local file paths (with optional
file://prefix stripping). Useful for routing local files to file-processing pipelines.
Usage
Use this principle when you need to connect pipelines to the workflow engine. Tasks are required whenever:
- A pipeline should only process a subset of the data stream (use
selector a specialized task type) - Multiple pipelines should process the same data and their results need to be combined (use multi-action tasks with a merge strategy)
- Data elements have composite structure that needs unpacking before pipeline processing
- Resource initialization or cleanup is needed around pipeline execution
Theoretical Basis
Task Abstraction
The task abstraction follows the decorator pattern: it wraps one or more callables with additional behavior (filtering, unpacking, merging) without modifying the callables themselves. This separation of concerns means that a pipeline does not need to know whether it is processing all elements or a filtered subset -- the task handles that concern.
Data Filtering
Task filtering implements a selective processing pattern where heterogeneous data streams contain mixed content types. Rather than requiring separate workflows for each content type, a single workflow can contain tasks with different filters, each processing only the elements it can handle. Unmatched elements flow through unmodified, preserving the complete data stream.
Action Composition and Merge Strategies
Multi-action tasks implement a fan-out/fan-in pattern:
- Fan-out: The same input elements are sent to all actions in parallel (or sequentially).
- Fan-in: The results from all actions are combined using the configured merge strategy.
This enables patterns such as extracting multiple features from the same text (hstack), generating alternative translations (vstack), or producing composite summaries (concat).
Pseudocode
task = Task(action=[pipeline1, pipeline2], select=r"\.pdf$", merge="hstack")
# For each element in the stream:
# 1. Check if element matches the select filter
# 2. If yes: unpack element, run through each action, merge results, repack
# 3. If no: pass element through unmodified
results = task(elements)