Workflow:Neuml Txtai Pipeline Workflow Chaining
| Knowledge Sources | |
|---|---|
| Domains | Workflows, Pipelines, Data_Processing |
| Last Updated | 2026-02-09 18:00 GMT |
Overview
End-to-end process for composing deterministic multi-step data processing workflows by chaining txtai pipelines (text extraction, summarization, translation, LLM prompting) together using the Workflow and Task classes.
Description
This workflow demonstrates how to build deterministic processing pipelines using txtai's Workflow engine. Unlike agents that dynamically choose their path, workflows follow a fixed sequence of tasks. Each task wraps a pipeline (Textractor, Summary, Translation, LLM, etc.) or a custom function, and data flows sequentially through the chain. The Workflow class handles batching, concurrent execution, scheduling, and error handling. Tasks support specialized types for file handling, URL processing, image loading, HTTP service calls, cloud storage access, data export, and template-based text generation. Workflows can also be defined declaratively via YAML configuration through the Application class, enabling no-code pipeline construction.
Usage
Execute this workflow when you need a reproducible, deterministic data processing pipeline that chains multiple NLP operations together. This is appropriate for batch processing tasks such as: extracting text from web pages then summarizing and translating the results, processing document collections through multiple analysis steps, or building ETL pipelines that combine AI models with traditional data transformations.
Execution Steps
Step 1: Define the Pipelines
Instantiate the individual pipeline components that will form the workflow steps. Each pipeline is a specialized model wrapper: Textractor for document/URL text extraction, Summary for abstractive summarization, Translation for language translation, LLM for general text generation, Labels for classification, Entity for named entity recognition, and others.
Key considerations:
- Each pipeline can be configured with a specific model, backend, and parameters
- Pipelines are independent and reusable across multiple workflows
- An LLM pipeline can replace multiple specialized pipelines when flexibility is preferred over precision
- Custom Python functions can also serve as pipeline steps
Step 2: Wrap Pipelines in Tasks
Create Task instances that wrap each pipeline. Tasks add data flow management, including input/output transformation, element filtering, and error handling. Specialized task types handle specific input formats: FileTask for local files, UrlTask for URLs, ImageTask for images, RetrieveTask for downloading content, ServiceTask for HTTP APIs, StorageTask for cloud buckets, TemplateTask for prompt templates, and ExportTask for saving results.
Key considerations:
- The base Task class wraps any callable (pipeline, function, or lambda)
- Specialized tasks filter inputs to only process matching elements (e.g., UrlTask skips non-URL inputs)
- Tasks support multiple concurrent actions that execute in parallel
- Template tasks enable structured prompt generation for LLM steps
Step 3: Compose the Workflow
Create a Workflow instance with the ordered list of tasks. Configure batch size for memory-efficient processing of large datasets, worker count for concurrent execution, and optional stream processing.
Key considerations:
- Tasks execute sequentially; each task's output becomes the next task's input
- Batch size controls how many elements are processed at once (default 100)
- Worker count enables concurrent execution of multi-action tasks
- Workflows can be nested using WorkflowTask to compose complex pipelines from simpler ones
Step 4: Execute the Workflow
Run the workflow by calling it with input data (a list of strings, file paths, URLs, or other elements). The workflow returns a generator that yields transformed results. Iterate over the generator or collect results into a list.
Key considerations:
- Input elements flow through all tasks in sequence
- The workflow returns a generator for memory-efficient processing
- Error handling can be configured at the task level
- Results maintain the same ordering as input elements
Step 5: Schedule or Serve (Optional)
For recurring execution, use the schedule() method with a cron expression. For serving workflows as an API, use the Application class with a YAML configuration file and deploy with FastAPI via uvicorn. The YAML configuration enables the entire pipeline to be defined declaratively without Python code.
Key considerations:
- Cron scheduling enables periodic batch processing (requires croniter dependency)
- YAML configuration maps pipeline names to classes and wires them into workflows
- The API layer exposes workflow execution via REST endpoints
- Scheduled workflows run in a thread pool managed by the Application class