Implementation:DataTalksClub Data engineering zoomcamp Dlt Loading Method Selection
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, dlt docs: dlt Documentation |
| Domains | Data_Engineering, Data_Ingestion |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for selecting between two data loading strategies at runtime -- GCS-staged loading or direct web-to-BigQuery streaming -- via user input branching.
Description
This implementation provides a runtime branching mechanism that allows the operator to choose between two distinct data loading paths before the pipeline executes. The user is prompted with input() to select either option "1" (GCS-staged loading) or option "2" (direct web streaming).
This is a Pattern Doc implementation demonstrating the strategy pattern for data loading.
Path 1: GCS-staged loading (dlt_method == "1")
In this path, Parquet files are first downloaded from the TLC CloudFront CDN and uploaded to a Google Cloud Storage (GCS) bucket. A GCS client is initialized using a service account JSON file. Each URL is downloaded with requests.get() and the content is uploaded to GCS via gcs_blob.upload_from_string(). After staging, a dlt resource reads from the GCS bucket using the filesystem source with a read_parquet transformer.
Path 2: Direct web streaming (dlt_method == "2")
In this path, Parquet files are streamed directly from the web using requests.get(url, stream=True). The response content is buffered in memory using io.BytesIO, parsed with PyArrow's pq.read_table(), and yielded as PyArrow Tables to the dlt pipeline. No intermediate cloud storage is required.
Both paths produce a dlt resource (generator function) that the pipeline can consume identically.
Usage
Use this implementation when:
- The operator needs to choose between staged and direct loading at runtime
- GCS infrastructure may or may not be available depending on the deployment environment
- Testing different loading strategies for performance comparison
- A single script must support multiple deployment scenarios
Code Reference
Source Location: cohorts/2025/workshops/dynamic_load_dlt.py, lines 58-108
Signature:
dlt_method = input("Choose loading method: 1 for GCS -> Bigquery, 2 for Direct Web -> Bigquery: ")
if dlt_method == "1":
# GCS-staged loading path
storage_client = storage.Client.from_service_account_json("gcs.json")
# ... upload to GCS, define parquet_source() resource
elif dlt_method == "2":
# Direct web streaming path
# ... define paginated_getter() resource
Import:
import io
import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
from google.cloud import storage
import pyarrow.parquet as pq
I/O Contract
Inputs:
| Parameter | Type | Required | Description |
|---|---|---|---|
| dlt_method | str (user input) | Yes | "1" for GCS-staged loading, "2" for direct web streaming
|
| urls | List[str] | Yes | List of Parquet file URLs generated by generate_urls()
|
| gcs.json | File (JSON) | Path 1 only | GCP service account credentials file for GCS client |
| bucket_name | str (user input) | Path 1 only | Name of the GCS bucket for staging files |
Outputs:
| Output | Type | Description |
|---|---|---|
| parquet_source (Path 1) | dlt.resource generator | Yields individual rows from Parquet files read via dlt filesystem source from GCS
|
| paginated_getter (Path 2) | dlt.resource generator | Yields PyArrow Table objects parsed from streamed Parquet files |
| gcs_files (Path 1 side effect) | List[str] | GCS URIs of uploaded files (e.g., gs://bucket/file.parquet)
|
Usage Examples
Path 1 -- GCS-staged loading:
import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
from google.cloud import storage
# Initialize GCS client and upload files
storage_client = storage.Client.from_service_account_json("gcs.json")
bucket_name = "my-taxi-data-bucket"
bucket = storage_client.bucket(bucket_name)
urls = generate_urls("green", 2021, 2021, 1, 3)
for url in urls:
file_name = url.split("/")[-1]
gcs_blob = bucket.blob(file_name)
response = requests.get(url)
gcs_blob.upload_from_string(response.content)
@dlt.resource(name="rides", write_disposition="replace")
def parquet_source():
files = filesystem(bucket_url=f"gs://{bucket_name}/", file_glob="*.parquet")
reader = (files | read_parquet()).with_name("tripdata")
for row in reader:
yield row
Path 2 -- Direct web streaming:
import io
import requests
import dlt
import pyarrow.parquet as pq
urls = generate_urls("yellow", 2021, 2021, 1, 3)
@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
for url in urls:
try:
with requests.get(url, stream=True) as response:
response.raise_for_status()
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=1024 * 1024):
buffer.write(chunk)
buffer.seek(0)
table = pq.read_table(buffer)
if table.num_rows > 0:
yield table
except Exception as e:
print(f"Failed to fetch data from {url}: {e}")