Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataExpert io Data engineer handbook Namedtuple CreateDataFrame Pattern

From Leeroopedia


Overview

Type: Pattern Doc

This implementation documents the namedtuple + createDataFrame pattern used across multiple test files to construct typed, schema-enforced test DataFrames in PySpark unit tests.

Source

  • test_monthly_user_site_hits.py:L4-7
  • test_player_scd.py:L3-5
  • test_team_vertex_job.py:L4-7

Interface

The pattern follows a consistent three-step process:

Step 1: Define a namedtuple

PlayerSeason = namedtuple("PlayerSeason", "player_name current_season scoring_class")

This defines a typed record with explicit field names that will become DataFrame column names.

Step 2: Create data as a list of namedtuple instances

input_data = [
    PlayerSeason("Michael Jordan", 2001, 'Good'),
    PlayerSeason("LeBron James", 2003, 'Great'),
]

Each element in the list represents one row. Field names enforce consistency across all rows.

Step 3: Create a DataFrame from the data

source_df = spark.createDataFrame(input_data)

Spark infers the schema from the namedtuple fields and value types, producing a fully typed DataFrame.

Examples from Test Files

test_monthly_user_site_hits.py

from collections import namedtuple

MonthlyUserSiteHit = namedtuple("MonthlyUserSiteHit", "user_id site_id month hit_count")

input_data = [
    MonthlyUserSiteHit(1, 100, "2023-01", 5),
    MonthlyUserSiteHit(1, 100, "2023-02", 10),
]
source_df = spark.createDataFrame(input_data)

test_player_scd.py

from collections import namedtuple

PlayerSeason = namedtuple("PlayerSeason", "player_name current_season scoring_class")

input_data = [
    PlayerSeason("Michael Jordan", 2001, 'Good'),
    PlayerSeason("Michael Jordan", 2002, 'Great'),
]
source_df = spark.createDataFrame(input_data)

test_team_vertex_job.py

from collections import namedtuple

TeamVertex = namedtuple("TeamVertex", "team_id team_name conference")

input_data = [
    TeamVertex(1, "Lakers", "Western"),
    TeamVertex(2, "Celtics", "Eastern"),
]
source_df = spark.createDataFrame(input_data)

Import

from collections import namedtuple

The namedtuple factory is part of Python's standard library collections module. No external dependencies are required.

I/O

  • Inputs:
    • A namedtuple class definition (field names as strings)
    • A list of namedtuple instances (the test data rows)
    • A SparkSession instance (typically injected via the spark fixture)
  • Outputs:
    • A PySpark DataFrame with schema inferred from the namedtuple fields and value types

Pattern Notes

  • The same pattern is used to construct both input DataFrames and expected output DataFrames within each test function
  • Different namedtuple definitions are used for input versus output when the transformation changes the schema
  • The pattern keeps tests readable because each row is a self-documenting named record rather than an anonymous tuple

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment