Implementation:Mage ai Mage ai Write State
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for emitting Singer STATE messages to stdout for incremental sync bookmarking provided by the Mage integrations message layer.
Description
write_state serializes a state dictionary as a Singer StateMessage and writes it as a JSON line to stdout. It is called from Source.write_records (after each record for sorted streams, or from Source.sync_stream after all records for unsorted streams) to checkpoint incremental sync progress. The downstream target reads these STATE messages to persist bookmarks for resumption.
Usage
Called automatically during the sync loop. Do not call directly unless implementing a custom sync flow.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/messages.py
- Lines: 123-128
Signature
def write_state(value: dict) -> None:
"""Write a state message.
Args:
value: State dict with bookmark values.
Structure: {"bookmarks": {"stream_id": {"col": "val"}}}
"""
Import
from mage_integrations.sources.messages import write_state
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| value | dict | Yes | State dict containing bookmarks per stream |
Outputs
| Name | Type | Description |
|---|---|---|
| stdout | JSON line | STATE message: {"type": "STATE", "value": {...}} |
Usage Examples
from mage_integrations.sources.messages import write_state
import singer
# Emit state after processing records
state = {}
singer.write_bookmark(state, "users", "updated_at", "2024-01-15T10:30:00Z")
write_state(state)
# Outputs to stdout: {"type": "STATE", "value": {"bookmarks": {"users": {"updated_at": "2024-01-15T10:30:00Z"}}}}