Implementation:DataTalksClub Data engineering zoomcamp Dlt Resource Decorator
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, dlt docs: dlt Documentation |
| Domains | Data_Engineering, Data_Ingestion |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for defining dlt data resources using the @dlt.resource decorator to wrap generator functions that yield trip data records from either GCS-hosted Parquet files or directly streamed web Parquet files.
Description
This implementation demonstrates two uses of the @dlt.resource decorator, each wrapping a different data extraction strategy as a generator function. The decorator transforms a plain Python generator into a framework-managed resource that dlt can extract, normalize, and load.
This is a Wrapper Doc implementation. The @dlt.resource decorator wraps the user's generator function with dlt's resource management layer.
Resource 1: parquet_source (GCS path)
Decorated with @dlt.resource(name="rides", write_disposition="replace"). This resource uses dlt's built-in filesystem source to read Parquet files from a GCS bucket. The filesystem() call enumerates files matching the glob pattern, and the pipe operator (|) chains the read_parquet() transformer to parse them. The resource yields individual rows from the parsed Parquet files.
Resource 2: paginated_getter (direct web path)
Decorated with @dlt.resource(name="ny_taxi_dlt", write_disposition="replace"). This resource iterates over the generated URLs, streams each Parquet file using requests.get(url, stream=True), buffers the response into a BytesIO object in 1MB chunks, parses it with PyArrow's pq.read_table(), and yields the resulting PyArrow Table. The dlt framework handles conversion from PyArrow Tables to its internal representation.
Both resources use write_disposition="replace", which instructs dlt to drop and recreate the destination table on each pipeline run.
Usage
Use this implementation when:
- Defining data extraction logic for a dlt pipeline targeting BigQuery
- The data source is Parquet files hosted on GCS or a public web endpoint
- The destination table should be fully replaced on each pipeline run
- dlt should automatically infer the schema from the yielded data
Code Reference
Source Location: cohorts/2025/workshops/dynamic_load_dlt.py, lines 78-108
Signature (Path 1 -- GCS):
@dlt.resource(name="rides", write_disposition="replace")
def parquet_source():
files = filesystem(bucket_url=f"gs://{bucket_name}/", file_glob="*.parquet")
reader = (files | read_parquet()).with_name("tripdata")
row_count = 0
for row in reader:
row_count += 1
yield row
Signature (Path 2 -- Direct Web):
@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
for url in urls:
try:
with requests.get(url, stream=True) as response:
response.raise_for_status()
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=1024 * 1024):
buffer.write(chunk)
buffer.seek(0)
table = pq.read_table(buffer)
if table.num_rows > 0:
yield table
except Exception as e:
print(f"Failed to fetch data from {url}: {e}")
Import:
import io
import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
import pyarrow.parquet as pq
I/O Contract
Inputs:
| Parameter | Type | Required | Description |
|---|---|---|---|
| name | str | Yes | Destination table name in BigQuery (e.g., "rides" or "ny_taxi_dlt")
|
| write_disposition | str | Yes | How to handle existing data: "replace" drops and recreates the table
|
| bucket_name (Path 1) | str | Yes (Path 1) | GCS bucket containing staged Parquet files |
| urls (Path 2) | List[str] | Yes (Path 2) | List of Parquet file URLs to stream and parse |
Outputs:
| Output | Type | Description |
|---|---|---|
| parquet_source (Path 1) | Generator[dict] | Yields individual row dictionaries parsed from GCS Parquet files via dlt filesystem |
| paginated_getter (Path 2) | Generator[pyarrow.Table] | Yields PyArrow Table objects, one per successfully fetched Parquet file |
Usage Examples
Defining and using a GCS-based resource:
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
bucket_name = "my-taxi-bucket"
@dlt.resource(name="rides", write_disposition="replace")
def parquet_source():
files = filesystem(bucket_url=f"gs://{bucket_name}/", file_glob="*.parquet")
reader = (files | read_parquet()).with_name("tripdata")
for row in reader:
yield row
# Pass the resource to a pipeline
pipeline = dlt.pipeline(
pipeline_name="gcs_taxi",
destination="bigquery",
dataset_name="taxi_data"
)
info = pipeline.run(parquet_source())
print(info)
Defining and using a direct web streaming resource:
import io
import requests
import dlt
import pyarrow.parquet as pq
urls = [
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet",
]
@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
for url in urls:
try:
with requests.get(url, stream=True) as response:
response.raise_for_status()
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=1024 * 1024):
buffer.write(chunk)
buffer.seek(0)
table = pq.read_table(buffer)
if table.num_rows > 0:
yield table
except Exception as e:
print(f"Failed to fetch data from {url}: {e}")
pipeline = dlt.pipeline(
pipeline_name="web_taxi",
destination="bigquery",
dataset_name="taxi_data"
)
info = pipeline.run(paginated_getter())
print(info)
Using the pipe operator for resource composition (Path 1 pattern):
from dlt.sources.filesystem import filesystem, read_parquet
# The pipe operator chains a file enumerator with a Parquet parser
files = filesystem(bucket_url="gs://my-bucket/", file_glob="*.parquet")
reader = (files | read_parquet()).with_name("tripdata")
# reader is now a composed resource that enumerates and parses in one step