Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Neuml Txtai Workflow Schedule And Application

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Workflow
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for automating workflow execution via cron scheduling and YAML-driven application configuration provided by the txtai library. This page covers two complementary APIs: Workflow.schedule for recurring execution, and Application.__init__ for declarative pipeline/workflow orchestration.

Description

Workflow.schedule blocks in a loop, sleeping until the next cron-scheduled time, then executing the workflow with the provided elements. Exceptions during workflow execution are caught and logged, preventing individual failures from terminating the schedule. The method supports both indefinite execution and a fixed number of iterations.

Application.__init__ reads a YAML configuration and constructs the full txtai stack: pipelines (via PipelineFactory), workflows (via WorkflowFactory), agents, and an embeddings index. Task actions in workflow configurations are resolved to pipeline callables by name. Workflows with a schedule key are automatically started in a background ThreadPool.

Usage

Use Workflow.schedule to run a workflow on a recurring cron schedule. Use Application to define and deploy the entire txtai stack from a YAML configuration file.

Code Reference

Source Location

  • Repository: txtai
  • File (schedule): src/python/txtai/workflow/base.py (lines 78-113)
  • File (Application): src/python/txtai/app/base.py (lines 54-83)

Signature

Workflow.schedule:

def schedule(self, cron, elements, iterations=None):

Application.__init__:

class Application:
    def __init__(self, config, loaddata=True):

Import

from txtai.workflow import Workflow
from txtai.app import Application

I/O Contract

Inputs

Workflow.schedule:

Name Type Required Description
cron str Yes Cron expression defining the execution schedule (e.g., "0 * * * *" for hourly, "0 0 * * *" for daily at midnight). Parsed by the croniter library using local timezone.
elements iterable Yes Input data elements passed to the workflow on each scheduled run. The same elements are reused for every iteration.
iterations int or None No Number of times to run the workflow. Defaults to None for indefinite execution. When set, the schedule loop terminates after the specified number of runs.

Application.__init__:

Name Type Required Description
config str, dict, or file path Yes YAML configuration defining pipelines, workflows, agents, and embeddings. Accepts a file path (str), a YAML string (str), or a pre-parsed dictionary. The Application.read static method handles all three formats.
loaddata bool No If True (default), load existing index data when available. If False, only load models without loading stored index data.

Outputs

Workflow.schedule:

Name Type Description
(none) void The method blocks in a scheduling loop. It does not return a value. Workflow results are consumed internally. Use iterations parameter to control termination.

Application.__init__:

Name Type Description
application Application A fully configured Application instance with self.pipelines (dict of named pipeline instances), self.workflows (dict of named Workflow instances), self.agents (dict of named Agent instances), and self.embeddings (Embeddings index or None).

Usage Examples

Basic Scheduling Example

from txtai.pipeline import Summary
from txtai.workflow import Task, Workflow

summary = Summary()
workflow = Workflow([Task(action=summary)], name="daily-summary")

# Run the workflow every day at midnight, indefinitely
# This call blocks!
workflow.schedule(
    cron="0 0 * * *",
    elements=["Fetch and summarize the latest news..."],
)

Fixed Iteration Scheduling

from txtai.pipeline import Translation
from txtai.workflow import Task, Workflow

translate = Translation()
workflow = Workflow([Task(action=translate)], name="hourly-translate")

# Run 5 times on an hourly schedule
workflow.schedule(
    cron="0 * * * *",
    elements=["Texte a traduire"],
    iterations=5,
)

YAML-Driven Application

from txtai.app import Application

# Define the full stack in YAML
config = """
summary:

translation:

workflow:
    extract_and_translate:
        tasks:
            - action: summary
            - action: translation
        batch: 50
"""

# Create application from YAML config
app = Application(config)

# Execute workflow by name
results = list(app.workflow("extract_and_translate", [
    "A long document that needs to be summarized and translated."
]))
print(results)

Application with Scheduled Workflow

from txtai.app import Application

config = """
summary:

workflow:
    nightly_summary:
        tasks:
            - action: summary
        schedule:
            cron: "0 0 * * *"
            elements:
                - "Daily report content to summarize..."
"""

# Scheduled workflows start automatically in background threads
app = Application(config)

# Block until all scheduled workflows complete
app.wait()

Application from YAML File

from txtai.app import Application

# Load configuration from a file
app = Application("config.yml")

# Access pipelines directly
result = app.pipeline("summary", "Text to summarize")

# Access workflows
results = list(app.workflow("my_workflow", ["input data"]))

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment