Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:PrefectHQ Prefect PrefectDbtRunner Invoke

From Leeroopedia


Metadata
Sources Prefect, prefect-dbt
Domains Analytics_Engineering, Orchestration
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete wrapper for executing dbt CLI commands via PrefectDbtRunner with enhanced logging and event emission.

Description

The run_dbt_commands task uses PrefectDbtRunner and PrefectDbtSettings from the prefect-dbt integration to execute dbt CLI commands within Prefect flows. PrefectDbtRunner provides native dbt execution with enhanced logging, failure handling, and automatic Prefect event emission for dbt node status changes. This is a Wrapper Doc since PrefectDbtRunner is from the external prefect-dbt package.

Code Reference

  • Repository: https://github.com/PrefectHQ/prefect
  • File: examples/run_dbt_with_prefect.py (L159-184) for the task, src/integrations/prefect-dbt/prefect_dbt/core/runner.py (L106) for PrefectDbtRunner
  • Signature:
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def run_dbt_commands(commands: list[str], project_dir: Path) -> None:
    settings = PrefectDbtSettings(
        project_dir=str(project_dir),
        profiles_dir=str(project_dir),
    )
    runner = PrefectDbtRunner(settings=settings, raise_on_failure=False)
    for command in commands:
        runner.invoke(command.split())
  • Import: from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings; from prefect import task

I/O Contract

Inputs

  • commands (list[str], required) -- dbt CLI commands like ["deps"], ["seed"], ["run"], ["test"]
  • project_dir (Path, required) -- dbt project directory

Outputs

  • None (side effects: dbt commands executed, events emitted, logs produced)

Usage Example

from pathlib import Path
from prefect import flow, task
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings

@task(retries=2, retry_delay_seconds=5, log_prints=True)
def run_dbt_commands(commands: list[str], project_dir: Path) -> None:
    settings = PrefectDbtSettings(
        project_dir=str(project_dir),
        profiles_dir=str(project_dir),
    )
    runner = PrefectDbtRunner(settings=settings, raise_on_failure=False)
    for command in commands:
        runner.invoke(command.split())

@flow(name="dbt_flow", log_prints=True)
def dbt_flow():
    project_dir = build_dbt_project()
    create_dbt_profiles(project_dir)
    run_dbt_commands(["deps"], project_dir)
    run_dbt_commands(["seed"], project_dir)
    run_dbt_commands(["run"], project_dir)
    run_dbt_commands(["test"], project_dir)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment