Implementation:PrefectHQ Prefect PrefectDbtRunner Invoke
Appearance
| Metadata | |
|---|---|
| Sources | Prefect, prefect-dbt |
| Domains | Analytics_Engineering, Orchestration |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete wrapper for executing dbt CLI commands via PrefectDbtRunner with enhanced logging and event emission.
Description
The run_dbt_commands task uses PrefectDbtRunner and PrefectDbtSettings from the prefect-dbt integration to execute dbt CLI commands within Prefect flows. PrefectDbtRunner provides native dbt execution with enhanced logging, failure handling, and automatic Prefect event emission for dbt node status changes. This is a Wrapper Doc since PrefectDbtRunner is from the external prefect-dbt package.
Code Reference
- Repository: https://github.com/PrefectHQ/prefect
- File: examples/run_dbt_with_prefect.py (L159-184) for the task, src/integrations/prefect-dbt/prefect_dbt/core/runner.py (L106) for PrefectDbtRunner
- Signature:
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def run_dbt_commands(commands: list[str], project_dir: Path) -> None:
settings = PrefectDbtSettings(
project_dir=str(project_dir),
profiles_dir=str(project_dir),
)
runner = PrefectDbtRunner(settings=settings, raise_on_failure=False)
for command in commands:
runner.invoke(command.split())
- Import:
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings; from prefect import task
I/O Contract
Inputs
- commands (list[str], required) -- dbt CLI commands like
["deps"],["seed"],["run"],["test"] - project_dir (Path, required) -- dbt project directory
Outputs
- None (side effects: dbt commands executed, events emitted, logs produced)
Usage Example
from pathlib import Path
from prefect import flow, task
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def run_dbt_commands(commands: list[str], project_dir: Path) -> None:
settings = PrefectDbtSettings(
project_dir=str(project_dir),
profiles_dir=str(project_dir),
)
runner = PrefectDbtRunner(settings=settings, raise_on_failure=False)
for command in commands:
runner.invoke(command.split())
@flow(name="dbt_flow", log_prints=True)
def dbt_flow():
project_dir = build_dbt_project()
create_dbt_profiles(project_dir)
run_dbt_commands(["deps"], project_dir)
run_dbt_commands(["seed"], project_dir)
run_dbt_commands(["run"], project_dir)
run_dbt_commands(["test"], project_dir)
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment