Implementation:Neuml Txtai Workflow Schedule
Implementation: Workflow_Schedule
| Field | Value |
|---|---|
| Sources | txtai, croniter |
| Domains | Workflow_Orchestration, Job_Scheduling |
| Last Updated | 2026-02-10 12:00 GMT |
Overview
Workflow_Schedule documents the Workflow.schedule method that runs a workflow on a recurring basis using cron expressions, wrapping the croniter library for schedule computation.
Description
The Workflow.schedule method provides an in-process recurring execution mechanism for workflows. It enters a blocking loop that computes the next execution time from a cron expression, sleeps until that time, executes the workflow, and repeats. The method wraps the croniter library for cron expression parsing and next-execution-time calculation.
Key implementation details:
- croniter dependency check: The method raises an
ImportErrorif thecroniterpackage is not installed (it is an optional dependency in the"workflow"extras group). - Timezone handling: Uses
datetime.now().astimezone()to obtain the current local time with timezone information, which is passed tocroniterfor timezone-aware schedule computation. - Sleep mechanism: Computes the sleep duration as the difference between the next scheduled timestamp and the current time (
schedule.timestamp() - time.time()), usingtime.sleep()for the wait. - Fault tolerance: Each workflow execution is wrapped in a
try/except Exceptionblock. Errors are logged viatraceback.format_exc()at ERROR level, and the scheduling loop continues. - Iteration counting: When
iterationsis specified, it is decremented after each execution. The loop exits when it reaches zero. WheniterationsisNone, the loop runs indefinitely.
This method wraps the croniter library, which handles all cron expression parsing and schedule generation. txtai adds the scheduling loop, fault tolerance, and workflow integration on top.
Usage
Call schedule() on a configured workflow to start recurring execution. This method is blocking and will not return until the iteration limit is reached or the process is terminated.
Code Reference
Source Location
- Repository
neuml/txtai- File
src/python/txtai/workflow/base.py- Lines
- 78--113
Signature
def schedule(self, cron, elements, iterations=None):
"""
Schedules a workflow using a cron expression and elements.
Args:
cron: cron expression
elements: iterable data elements passed to workflow each call
iterations: number of times to run workflow, defaults to run indefinitely
"""
# Check that croniter is installed
if not CRONITER:
raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')
logger.info("'%s' scheduler started with schedule %s", self.name, cron)
maxiterations = iterations
while iterations is None or iterations > 0:
# Schedule using localtime
schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
time.sleep(schedule.timestamp() - time.time())
# Run workflow
try:
for _ in self(elements):
pass
except Exception:
logger.error(traceback.format_exc())
# Decrement iterations remaining, if necessary
if iterations is not None:
iterations -= 1
logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)
Import
from txtai import Workflow
Wrapped Library
| Field | Value |
|---|---|
| Library | croniter
|
| Install | pip install txtai[workflow] or pip install croniter
|
| Usage | croniter(cron_expr, start_time).get_next(datetime) computes the next occurrence of the cron schedule after the given start time.
|
| Repository | github.com/kiorky/croniter |
I/O Contract
Inputs
| Parameter | Type | Required | Description |
|---|---|---|---|
cron |
str | Yes | Cron expression in standard five-field format: minute hour day-of-month month day-of-week. Examples: "*/5 * * * *" (every 5 min), "0 0 * * *" (daily at midnight).
|
elements |
iterable | Yes | Data elements passed to the workflow on each scheduled execution. The same elements are reused for every run. |
iterations |
int / None | No | Number of times to execute the workflow. None runs indefinitely. Defaults to None.
|
Outputs
| Output | Type | Description |
|---|---|---|
| return | None |
This method does not return a value. It blocks until the iteration limit is reached. Workflow results are consumed internally (discarded). Side effects of the workflow (e.g., database writes, index updates) persist. |
Side Effects
- Logs scheduling events at INFO level (start, next run time, max iterations reached)
- Logs workflow execution errors at ERROR level with full tracebacks
- Blocks the calling thread for the duration of the schedule
Usage Examples
Run Every 5 Minutes Indefinitely
from txtai import Workflow
from txtai.pipeline import Textractor
from txtai.workflow import Task
textractor = Textractor()
workflow = Workflow([Task(action=textractor)], name="doc-ingest")
# Run every 5 minutes, processing a fixed set of URLs
urls = [
"https://example.com/feed1",
"https://example.com/feed2",
]
workflow.schedule("*/5 * * * *", urls)
# This call blocks indefinitely
Run Daily at Midnight, Limited Iterations
from txtai import Workflow
from txtai.pipeline import Summary
from txtai.workflow import Task
summary = Summary()
workflow = Workflow([Task(action=summary)], name="daily-summary")
# Run at midnight for 30 days
articles = ["Article content to summarize..."]
workflow.schedule("0 0 * * *", articles, iterations=30)
Run in a Background Thread
import threading
from txtai import Workflow
from txtai.workflow import Task
workflow = Workflow([Task(action=my_pipeline)], name="background-job")
# Run schedule in a background thread to avoid blocking
thread = threading.Thread(
target=workflow.schedule,
args=("0 */2 * * *", data),
daemon=True
)
thread.start()
# Main thread continues executing
print("Scheduler running in background")