Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:PrefectHQ Prefect To Dataframe Task

From Leeroopedia
Revision as of 16:22, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/PrefectHQ_Prefect_To_Dataframe_Task.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Metadata
Sources Prefect
Domains ETL, Data_Engineering
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete task for normalizing nested JSON into a pandas DataFrame provided by the Prefect ETL example.

Description

The to_dataframe task flattens nested API response data (list of lists of dicts) into a single pandas DataFrame using pd.json_normalize, then selects specific columns for downstream processing.

Code Reference

@task
def to_dataframe(raw_articles: list[list[dict[str, Any]]]) -> pd.DataFrame:
    """Flatten & normalise JSON into a tidy DataFrame."""
    records = [article for page in raw_articles for article in page]
    df = pd.json_normalize(records)[
        ["id", "title", "published_at", "url", "comments_count",
         "positive_reactions_count", "tag_list", "user.username"]
    ]
    return df
  • Import: from prefect import task; import pandas as pd

I/O Contract

Inputs

  • raw_articles (list[list[dict[str, Any]]], required) -- nested list of article dicts from paginated API

Outputs

  • pd.DataFrame with columns: id, title, published_at, url, comments_count, positive_reactions_count, tag_list, user.username

Usage Example

from prefect import flow, task
import pandas as pd

@task
def to_dataframe(raw_articles):
    records = [article for page in raw_articles for article in page]
    df = pd.json_normalize(records)[
        ["id", "title", "published_at", "url", "comments_count",
         "positive_reactions_count", "tag_list", "user.username"]
    ]
    return df

@flow(name="devto_etl", log_prints=True)
def etl(api_base, pages, per_page, output_file):
    raw_pages = [fetch_page(p, api_base, per_page) for p in range(1, pages + 1)]
    df = to_dataframe(raw_pages)
    save_csv(df, output_file)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment