Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Dlt Resource Decorator

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, dlt docs: dlt Documentation
Domains Data_Engineering, Data_Ingestion
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for defining dlt data resources using the @dlt.resource decorator to wrap generator functions that yield trip data records from either GCS-hosted Parquet files or directly streamed web Parquet files.

Description

This implementation demonstrates two uses of the @dlt.resource decorator, each wrapping a different data extraction strategy as a generator function. The decorator transforms a plain Python generator into a framework-managed resource that dlt can extract, normalize, and load.

This is a Wrapper Doc implementation. The @dlt.resource decorator wraps the user's generator function with dlt's resource management layer.

Resource 1: parquet_source (GCS path)

Decorated with @dlt.resource(name="rides", write_disposition="replace"). This resource uses dlt's built-in filesystem source to read Parquet files from a GCS bucket. The filesystem() call enumerates files matching the glob pattern, and the pipe operator (|) chains the read_parquet() transformer to parse them. The resource yields individual rows from the parsed Parquet files.

Resource 2: paginated_getter (direct web path)

Decorated with @dlt.resource(name="ny_taxi_dlt", write_disposition="replace"). This resource iterates over the generated URLs, streams each Parquet file using requests.get(url, stream=True), buffers the response into a BytesIO object in 1MB chunks, parses it with PyArrow's pq.read_table(), and yields the resulting PyArrow Table. The dlt framework handles conversion from PyArrow Tables to its internal representation.

Both resources use write_disposition="replace", which instructs dlt to drop and recreate the destination table on each pipeline run.

Usage

Use this implementation when:

  • Defining data extraction logic for a dlt pipeline targeting BigQuery
  • The data source is Parquet files hosted on GCS or a public web endpoint
  • The destination table should be fully replaced on each pipeline run
  • dlt should automatically infer the schema from the yielded data

Code Reference

Source Location: cohorts/2025/workshops/dynamic_load_dlt.py, lines 78-108

Signature (Path 1 -- GCS):

@dlt.resource(name="rides", write_disposition="replace")
def parquet_source():
    files = filesystem(bucket_url=f"gs://{bucket_name}/", file_glob="*.parquet")
    reader = (files | read_parquet()).with_name("tripdata")
    row_count = 0
    for row in reader:
        row_count += 1
        yield row

Signature (Path 2 -- Direct Web):

@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
    for url in urls:
        try:
            with requests.get(url, stream=True) as response:
                response.raise_for_status()
                buffer = io.BytesIO()
                for chunk in response.iter_content(chunk_size=1024 * 1024):
                    buffer.write(chunk)
                buffer.seek(0)
                table = pq.read_table(buffer)
                if table.num_rows > 0:
                    yield table
        except Exception as e:
            print(f"Failed to fetch data from {url}: {e}")

Import:

import io
import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
import pyarrow.parquet as pq

I/O Contract

Inputs:

Parameter Type Required Description
name str Yes Destination table name in BigQuery (e.g., "rides" or "ny_taxi_dlt")
write_disposition str Yes How to handle existing data: "replace" drops and recreates the table
bucket_name (Path 1) str Yes (Path 1) GCS bucket containing staged Parquet files
urls (Path 2) List[str] Yes (Path 2) List of Parquet file URLs to stream and parse

Outputs:

Output Type Description
parquet_source (Path 1) Generator[dict] Yields individual row dictionaries parsed from GCS Parquet files via dlt filesystem
paginated_getter (Path 2) Generator[pyarrow.Table] Yields PyArrow Table objects, one per successfully fetched Parquet file

Usage Examples

Defining and using a GCS-based resource:

import dlt
from dlt.sources.filesystem import filesystem, read_parquet

bucket_name = "my-taxi-bucket"

@dlt.resource(name="rides", write_disposition="replace")
def parquet_source():
    files = filesystem(bucket_url=f"gs://{bucket_name}/", file_glob="*.parquet")
    reader = (files | read_parquet()).with_name("tripdata")
    for row in reader:
        yield row

# Pass the resource to a pipeline
pipeline = dlt.pipeline(
    pipeline_name="gcs_taxi",
    destination="bigquery",
    dataset_name="taxi_data"
)
info = pipeline.run(parquet_source())
print(info)

Defining and using a direct web streaming resource:

import io
import requests
import dlt
import pyarrow.parquet as pq

urls = [
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet",
]

@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
    for url in urls:
        try:
            with requests.get(url, stream=True) as response:
                response.raise_for_status()
                buffer = io.BytesIO()
                for chunk in response.iter_content(chunk_size=1024 * 1024):
                    buffer.write(chunk)
                buffer.seek(0)
                table = pq.read_table(buffer)
                if table.num_rows > 0:
                    yield table
        except Exception as e:
            print(f"Failed to fetch data from {url}: {e}")

pipeline = dlt.pipeline(
    pipeline_name="web_taxi",
    destination="bigquery",
    dataset_name="taxi_data"
)
info = pipeline.run(paginated_getter())
print(info)

Using the pipe operator for resource composition (Path 1 pattern):

from dlt.sources.filesystem import filesystem, read_parquet

# The pipe operator chains a file enumerator with a Parquet parser
files = filesystem(bucket_url="gs://my-bucket/", file_glob="*.parquet")
reader = (files | read_parquet()).with_name("tripdata")
# reader is now a composed resource that enumerates and parses in one step

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment