Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Dlt Loading Method Selection

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, dlt docs: dlt Documentation
Domains Data_Engineering, Data_Ingestion
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for selecting between two data loading strategies at runtime -- GCS-staged loading or direct web-to-BigQuery streaming -- via user input branching.

Description

This implementation provides a runtime branching mechanism that allows the operator to choose between two distinct data loading paths before the pipeline executes. The user is prompted with input() to select either option "1" (GCS-staged loading) or option "2" (direct web streaming).

This is a Pattern Doc implementation demonstrating the strategy pattern for data loading.

Path 1: GCS-staged loading (dlt_method == "1")

In this path, Parquet files are first downloaded from the TLC CloudFront CDN and uploaded to a Google Cloud Storage (GCS) bucket. A GCS client is initialized using a service account JSON file. Each URL is downloaded with requests.get() and the content is uploaded to GCS via gcs_blob.upload_from_string(). After staging, a dlt resource reads from the GCS bucket using the filesystem source with a read_parquet transformer.

Path 2: Direct web streaming (dlt_method == "2")

In this path, Parquet files are streamed directly from the web using requests.get(url, stream=True). The response content is buffered in memory using io.BytesIO, parsed with PyArrow's pq.read_table(), and yielded as PyArrow Tables to the dlt pipeline. No intermediate cloud storage is required.

Both paths produce a dlt resource (generator function) that the pipeline can consume identically.

Usage

Use this implementation when:

  • The operator needs to choose between staged and direct loading at runtime
  • GCS infrastructure may or may not be available depending on the deployment environment
  • Testing different loading strategies for performance comparison
  • A single script must support multiple deployment scenarios

Code Reference

Source Location: cohorts/2025/workshops/dynamic_load_dlt.py, lines 58-108

Signature:

dlt_method = input("Choose loading method: 1 for GCS -> Bigquery, 2 for Direct Web -> Bigquery: ")

if dlt_method == "1":
    # GCS-staged loading path
    storage_client = storage.Client.from_service_account_json("gcs.json")
    # ... upload to GCS, define parquet_source() resource
elif dlt_method == "2":
    # Direct web streaming path
    # ... define paginated_getter() resource

Import:

import io
import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
from google.cloud import storage
import pyarrow.parquet as pq

I/O Contract

Inputs:

Parameter Type Required Description
dlt_method str (user input) Yes "1" for GCS-staged loading, "2" for direct web streaming
urls List[str] Yes List of Parquet file URLs generated by generate_urls()
gcs.json File (JSON) Path 1 only GCP service account credentials file for GCS client
bucket_name str (user input) Path 1 only Name of the GCS bucket for staging files

Outputs:

Output Type Description
parquet_source (Path 1) dlt.resource generator Yields individual rows from Parquet files read via dlt filesystem source from GCS
paginated_getter (Path 2) dlt.resource generator Yields PyArrow Table objects parsed from streamed Parquet files
gcs_files (Path 1 side effect) List[str] GCS URIs of uploaded files (e.g., gs://bucket/file.parquet)

Usage Examples

Path 1 -- GCS-staged loading:

import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
from google.cloud import storage

# Initialize GCS client and upload files
storage_client = storage.Client.from_service_account_json("gcs.json")
bucket_name = "my-taxi-data-bucket"
bucket = storage_client.bucket(bucket_name)

urls = generate_urls("green", 2021, 2021, 1, 3)
for url in urls:
    file_name = url.split("/")[-1]
    gcs_blob = bucket.blob(file_name)
    response = requests.get(url)
    gcs_blob.upload_from_string(response.content)

@dlt.resource(name="rides", write_disposition="replace")
def parquet_source():
    files = filesystem(bucket_url=f"gs://{bucket_name}/", file_glob="*.parquet")
    reader = (files | read_parquet()).with_name("tripdata")
    for row in reader:
        yield row

Path 2 -- Direct web streaming:

import io
import requests
import dlt
import pyarrow.parquet as pq

urls = generate_urls("yellow", 2021, 2021, 1, 3)

@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
    for url in urls:
        try:
            with requests.get(url, stream=True) as response:
                response.raise_for_status()
                buffer = io.BytesIO()
                for chunk in response.iter_content(chunk_size=1024 * 1024):
                    buffer.write(chunk)
                buffer.seek(0)
                table = pq.read_table(buffer)
                if table.num_rows > 0:
                    yield table
        except Exception as e:
            print(f"Failed to fetch data from {url}: {e}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment