Implementation:Dagster io Dagster Asset Check Decorator
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Data_Quality |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete decorator for defining asset quality checks provided by the Dagster core library.
Description
The @asset_check decorator transforms a Python function into an AssetChecksDefinition object that validates the contents of a target asset. The decorated function must return an AssetCheckResult indicating pass/fail status, optional metadata, and a severity level. When blocking=True, a failed check prevents downstream assets from materializing.
The decorator supports resource injection (via required_resource_keys or typed parameters), additional asset inputs for cross-asset validation, and partitioned checks that align with the target asset's partitioning scheme.
Usage
Import the decorator and result classes from the dagster package. Apply the decorator to a function that performs validation logic and returns an AssetCheckResult. Register the check in the Definitions object alongside assets.
Code Reference
Source Location
- Repository: dagster
- File: python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py:L102
Signature
def asset_check(
*,
asset: Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset],
name: Optional[str] = None,
description: Optional[str] = None,
blocking: bool = False,
additional_ins: Optional[Mapping[str, AssetIn]] = None,
additional_deps: Optional[Iterable[CoercibleToAssetDep]] = None,
required_resource_keys: Optional[set[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
automation_condition: Optional[AutomationCondition[AssetCheckKey]] = None,
partitions_def: Optional[PartitionsDefinition] = None,
) -> Callable[[AssetCheckFunction], AssetChecksDefinition]
Import
from dagster import asset_check, AssetCheckResult, AssetCheckSeverity
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| asset | Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset] | Yes | The target asset that this check validates. |
| name | str | No | The name of the check. Defaults to the function name. |
| description | str | No | Human-readable description displayed in the Dagster UI. |
| blocking | bool | No | If True, a failed check prevents downstream assets from materializing. Defaults to False. |
| additional_ins | Mapping[str, AssetIn] | No | Extra asset inputs for cross-asset validation. |
| additional_deps | Iterable[CoercibleToAssetDep] | No | Additional asset dependencies that do not pass data. |
| required_resource_keys | set[str] | No | Set of resource keys required by the check function. |
| metadata | Mapping[str, Any] | No | Static metadata attached to the check definition. |
| automation_condition | AutomationCondition[AssetCheckKey] | No | Condition under which the check should be automatically executed. |
| partitions_def | PartitionsDefinition | No | Partitioning scheme for the check, typically matching the target asset. |
Outputs
| Name | Type | Description |
|---|---|---|
| return | AssetChecksDefinition | A check definition object that can be included in a Dagster Definitions object. |
| (function return) | AssetCheckResult | The decorated function must return an AssetCheckResult with passed (bool), metadata (Mapping), and severity (AssetCheckSeverity). |
Usage Examples
Null Value Check
import dagster as dg
@dg.asset_check(asset=raw_customers, blocking=True)
def no_null_customer_ids(duckdb: DuckDBResource) -> dg.AssetCheckResult:
with duckdb.get_connection() as conn:
null_count = conn.execute(
"SELECT COUNT(*) FROM raw_customers WHERE customer_id IS NULL"
).fetchone()[0]
return dg.AssetCheckResult(
passed=null_count == 0,
metadata={"null_count": null_count},
severity=dg.AssetCheckSeverity.ERROR,
)
Row Count Threshold Check
import dagster as dg
@dg.asset_check(asset=daily_orders, blocking=False, description="Warn if fewer than 100 orders")
def minimum_order_count(duckdb: DuckDBResource) -> dg.AssetCheckResult:
with duckdb.get_connection() as conn:
count = conn.execute("SELECT COUNT(*) FROM daily_orders").fetchone()[0]
return dg.AssetCheckResult(
passed=count >= 100,
metadata={"order_count": count},
severity=dg.AssetCheckSeverity.WARN,
)