Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Neuml Txtai Workflow Schedule

From Leeroopedia


Implementation: Workflow_Schedule

Field Value
Sources txtai, croniter
Domains Workflow_Orchestration, Job_Scheduling
Last Updated 2026-02-10 12:00 GMT

Overview

Workflow_Schedule documents the Workflow.schedule method that runs a workflow on a recurring basis using cron expressions, wrapping the croniter library for schedule computation.

Description

The Workflow.schedule method provides an in-process recurring execution mechanism for workflows. It enters a blocking loop that computes the next execution time from a cron expression, sleeps until that time, executes the workflow, and repeats. The method wraps the croniter library for cron expression parsing and next-execution-time calculation.

Key implementation details:

  • croniter dependency check: The method raises an ImportError if the croniter package is not installed (it is an optional dependency in the "workflow" extras group).
  • Timezone handling: Uses datetime.now().astimezone() to obtain the current local time with timezone information, which is passed to croniter for timezone-aware schedule computation.
  • Sleep mechanism: Computes the sleep duration as the difference between the next scheduled timestamp and the current time (schedule.timestamp() - time.time()), using time.sleep() for the wait.
  • Fault tolerance: Each workflow execution is wrapped in a try/except Exception block. Errors are logged via traceback.format_exc() at ERROR level, and the scheduling loop continues.
  • Iteration counting: When iterations is specified, it is decremented after each execution. The loop exits when it reaches zero. When iterations is None, the loop runs indefinitely.

This method wraps the croniter library, which handles all cron expression parsing and schedule generation. txtai adds the scheduling loop, fault tolerance, and workflow integration on top.

Usage

Call schedule() on a configured workflow to start recurring execution. This method is blocking and will not return until the iteration limit is reached or the process is terminated.

Code Reference

Source Location

Repository
neuml/txtai
File
src/python/txtai/workflow/base.py
Lines
78--113

Signature

def schedule(self, cron, elements, iterations=None):
    """
    Schedules a workflow using a cron expression and elements.

    Args:
        cron: cron expression
        elements: iterable data elements passed to workflow each call
        iterations: number of times to run workflow, defaults to run indefinitely
    """

    # Check that croniter is installed
    if not CRONITER:
        raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')

    logger.info("'%s' scheduler started with schedule %s", self.name, cron)

    maxiterations = iterations
    while iterations is None or iterations > 0:
        # Schedule using localtime
        schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
        logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
        time.sleep(schedule.timestamp() - time.time())

        # Run workflow
        try:
            for _ in self(elements):
                pass
        except Exception:
            logger.error(traceback.format_exc())

        # Decrement iterations remaining, if necessary
        if iterations is not None:
            iterations -= 1

    logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)

Import

from txtai import Workflow

Wrapped Library

Field Value
Library croniter
Install pip install txtai[workflow] or pip install croniter
Usage croniter(cron_expr, start_time).get_next(datetime) computes the next occurrence of the cron schedule after the given start time.
Repository github.com/kiorky/croniter

I/O Contract

Inputs

Parameter Type Required Description
cron str Yes Cron expression in standard five-field format: minute hour day-of-month month day-of-week. Examples: "*/5 * * * *" (every 5 min), "0 0 * * *" (daily at midnight).
elements iterable Yes Data elements passed to the workflow on each scheduled execution. The same elements are reused for every run.
iterations int / None No Number of times to execute the workflow. None runs indefinitely. Defaults to None.

Outputs

Output Type Description
return None This method does not return a value. It blocks until the iteration limit is reached. Workflow results are consumed internally (discarded). Side effects of the workflow (e.g., database writes, index updates) persist.

Side Effects

  • Logs scheduling events at INFO level (start, next run time, max iterations reached)
  • Logs workflow execution errors at ERROR level with full tracebacks
  • Blocks the calling thread for the duration of the schedule

Usage Examples

Run Every 5 Minutes Indefinitely

from txtai import Workflow
from txtai.pipeline import Textractor
from txtai.workflow import Task

textractor = Textractor()
workflow = Workflow([Task(action=textractor)], name="doc-ingest")

# Run every 5 minutes, processing a fixed set of URLs
urls = [
    "https://example.com/feed1",
    "https://example.com/feed2",
]
workflow.schedule("*/5 * * * *", urls)
# This call blocks indefinitely

Run Daily at Midnight, Limited Iterations

from txtai import Workflow
from txtai.pipeline import Summary
from txtai.workflow import Task

summary = Summary()
workflow = Workflow([Task(action=summary)], name="daily-summary")

# Run at midnight for 30 days
articles = ["Article content to summarize..."]
workflow.schedule("0 0 * * *", articles, iterations=30)

Run in a Background Thread

import threading
from txtai import Workflow
from txtai.workflow import Task

workflow = Workflow([Task(action=my_pipeline)], name="background-job")

# Run schedule in a background thread to avoid blocking
thread = threading.Thread(
    target=workflow.schedule,
    args=("0 */2 * * *", data),
    daemon=True
)
thread.start()

# Main thread continues executing
print("Scheduler running in background")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment