Implementation:Neuml Txtai Workflow Schedule And Application
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Workflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for automating workflow execution via cron scheduling and YAML-driven application configuration provided by the txtai library. This page covers two complementary APIs: Workflow.schedule for recurring execution, and Application.__init__ for declarative pipeline/workflow orchestration.
Description
Workflow.schedule blocks in a loop, sleeping until the next cron-scheduled time, then executing the workflow with the provided elements. Exceptions during workflow execution are caught and logged, preventing individual failures from terminating the schedule. The method supports both indefinite execution and a fixed number of iterations.
Application.__init__ reads a YAML configuration and constructs the full txtai stack: pipelines (via PipelineFactory), workflows (via WorkflowFactory), agents, and an embeddings index. Task actions in workflow configurations are resolved to pipeline callables by name. Workflows with a schedule key are automatically started in a background ThreadPool.
Usage
Use Workflow.schedule to run a workflow on a recurring cron schedule. Use Application to define and deploy the entire txtai stack from a YAML configuration file.
Code Reference
Source Location
- Repository: txtai
- File (schedule):
src/python/txtai/workflow/base.py(lines 78-113) - File (Application):
src/python/txtai/app/base.py(lines 54-83)
Signature
Workflow.schedule:
def schedule(self, cron, elements, iterations=None):
Application.__init__:
class Application:
def __init__(self, config, loaddata=True):
Import
from txtai.workflow import Workflow
from txtai.app import Application
I/O Contract
Inputs
Workflow.schedule:
| Name | Type | Required | Description |
|---|---|---|---|
| cron | str | Yes | Cron expression defining the execution schedule (e.g., "0 * * * *" for hourly, "0 0 * * *" for daily at midnight). Parsed by the croniter library using local timezone.
|
| elements | iterable | Yes | Input data elements passed to the workflow on each scheduled run. The same elements are reused for every iteration. |
| iterations | int or None | No | Number of times to run the workflow. Defaults to None for indefinite execution. When set, the schedule loop terminates after the specified number of runs.
|
Application.__init__:
| Name | Type | Required | Description |
|---|---|---|---|
| config | str, dict, or file path | Yes | YAML configuration defining pipelines, workflows, agents, and embeddings. Accepts a file path (str), a YAML string (str), or a pre-parsed dictionary. The Application.read static method handles all three formats.
|
| loaddata | bool | No | If True (default), load existing index data when available. If False, only load models without loading stored index data. |
Outputs
Workflow.schedule:
| Name | Type | Description |
|---|---|---|
| (none) | void | The method blocks in a scheduling loop. It does not return a value. Workflow results are consumed internally. Use iterations parameter to control termination. |
Application.__init__:
| Name | Type | Description |
|---|---|---|
| application | Application | A fully configured Application instance with self.pipelines (dict of named pipeline instances), self.workflows (dict of named Workflow instances), self.agents (dict of named Agent instances), and self.embeddings (Embeddings index or None).
|
Usage Examples
Basic Scheduling Example
from txtai.pipeline import Summary
from txtai.workflow import Task, Workflow
summary = Summary()
workflow = Workflow([Task(action=summary)], name="daily-summary")
# Run the workflow every day at midnight, indefinitely
# This call blocks!
workflow.schedule(
cron="0 0 * * *",
elements=["Fetch and summarize the latest news..."],
)
Fixed Iteration Scheduling
from txtai.pipeline import Translation
from txtai.workflow import Task, Workflow
translate = Translation()
workflow = Workflow([Task(action=translate)], name="hourly-translate")
# Run 5 times on an hourly schedule
workflow.schedule(
cron="0 * * * *",
elements=["Texte a traduire"],
iterations=5,
)
YAML-Driven Application
from txtai.app import Application
# Define the full stack in YAML
config = """
summary:
translation:
workflow:
extract_and_translate:
tasks:
- action: summary
- action: translation
batch: 50
"""
# Create application from YAML config
app = Application(config)
# Execute workflow by name
results = list(app.workflow("extract_and_translate", [
"A long document that needs to be summarized and translated."
]))
print(results)
Application with Scheduled Workflow
from txtai.app import Application
config = """
summary:
workflow:
nightly_summary:
tasks:
- action: summary
schedule:
cron: "0 0 * * *"
elements:
- "Daily report content to summarize..."
"""
# Scheduled workflows start automatically in background threads
app = Application(config)
# Block until all scheduled workflows complete
app.wait()
Application from YAML File
from txtai.app import Application
# Load configuration from a file
app = Application("config.yml")
# Access pipelines directly
result = app.pipeline("summary", "Text to summarize")
# Access workflows
results = list(app.workflow("my_workflow", ["input data"]))