Principle:Neuml Txtai Task Wrapping
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Workflow, Pipeline |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Task Wrapping is the principle of adapting raw callable pipelines into workflow-compatible processing units. A Task in txtai acts as a decorator around one or more pipeline actions, adding capabilities that raw pipelines do not possess: data routing via regex filters, automatic unpacking of structured tuples, multi-action output merging, lifecycle hooks (initialize/finalize), and configurable concurrency. Tasks are the essential adapter layer between individual pipelines and the orchestrating Workflow.
Description
Raw pipelines accept input and produce output, but they have no awareness of the larger data flow context. They do not know how to:
- Filter which elements they should process and which should pass through unchanged.
- Unpack structured data elements that carry metadata alongside the payload (e.g.,
(id, data, tag)tuples). - Merge outputs when multiple actions run on the same input (horizontal stacking, vertical stacking, or concatenation).
- Transform cardinality -- handle one-to-many relationships where a single input produces multiple outputs.
- Run concurrently with thread or process-based parallelism.
The Task class solves all of these problems. When a pipeline is wrapped in a Task, the Task handles:
- Selection: The
selectparameter accepts a regex pattern. Only data elements whose string representation matches the pattern are passed to the action. Non-matching elements flow through unmodified, enabling content-aware routing in heterogeneous data streams.
- Unpacking: When
unpack=True(the default), the Task extracts the data payload from(id, data, tag)tuples before passing it to the action, and re-packs the result back into the tuple structure afterward.
- Multi-Action Merging: A Task can hold multiple actions. When multiple actions process the same input, the
mergeparameter controls how their outputs are combined:"hstack"(default) -- column-wise merge producing tuples of outputs."vstack"-- row-wise merge producing one-to-many transformations."concat"-- column-wise merge with string concatenation.
- One-to-Many Support: When
onetomany=True(the default), list outputs from a single action are interpreted as multiple output elements, expanding the data stream.
- Lifecycle Hooks: The
initializeandfinalizecallables run before and after the workflow processes all data, enabling setup and teardown logic (e.g., opening/closing database connections, triggering index builds).
- Concurrency: The
concurrencyparameter ("thread"or"process") controls how multi-action tasks are parallelized when an executor is available.
Usage
Use Task Wrapping when you need to:
- Add data routing to a pipeline so it only processes elements matching a specific pattern (e.g., only
.pdffiles). - Combine multiple pipeline outputs for the same input into a single merged result.
- Integrate pipelines into a Workflow that requires the Task callable interface.
- Add setup/teardown logic around pipeline execution (initialize and finalize hooks).
- Parallelize multiple actions within a single processing step using thread or process concurrency.
Theoretical Basis
Task Wrapping implements the Adapter Pattern from object-oriented design. The Task class adapts the simple callable interface of a pipeline into the richer interface expected by the Workflow orchestrator. It adds cross-cutting concerns (filtering, unpacking, merging, concurrency) without modifying the underlying pipeline code.
The multi-action merging system follows principles from data frame operations: hstack corresponds to column binding (like numpy.stack(axis=1)), vstack corresponds to row binding (like numpy.concatenate), and concat performs string-level joining. This gives developers familiar semantics for combining heterogeneous pipeline outputs.
The select filtering mechanism implements a form of content-based routing, a pattern from enterprise integration where messages are directed to processors based on their content rather than a fixed routing table. This enables a single workflow to handle mixed data types (e.g., a stream containing both PDF paths and plain text strings).