Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Dlt Pipeline Run

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, dlt docs: dlt Documentation
Domains Data_Engineering, Data_Ingestion
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for configuring a dlt pipeline targeting BigQuery and executing it with a data resource to perform automated schema inference, normalization, and loading of NYC taxi trip data.

Description

This implementation creates a dlt pipeline instance using dlt.pipeline() with three configuration parameters: a pipeline name for state tracking, a user-specified dataset name for the BigQuery destination, and "bigquery" as the destination type. The pipeline is then executed via pipeline.run(), which accepts either the parquet_source() or paginated_getter() dlt resource depending on the loading method selected earlier.

This is a Wrapper Doc implementation. The dlt.pipeline() and pipeline.run() calls wrap the dlt framework's internal extract-normalize-load workflow.

When pipeline.run() is called, the framework:

  1. Extracts data by consuming the generator function (the dlt resource)
  2. Normalizes the extracted data into a relational schema suitable for BigQuery
  3. Loads the normalized data into the specified BigQuery dataset
  4. Returns a LoadInfo object containing metadata about the load (record counts, table names, schema changes, load duration)

The pipeline name ("test_taxi") is used to persist pipeline state locally. On subsequent runs, dlt uses this state to detect schema changes and manage incremental loads (though this specific implementation uses write_disposition="replace", which replaces data entirely).

The dataset name is collected via input(), allowing the operator to target different BigQuery datasets without code changes.

Usage

Use this implementation when:

  • Loading NYC taxi trip data (or similar structured data) into BigQuery via dlt
  • The pipeline needs automatic schema inference from Parquet data
  • The operator should be able to specify the target dataset at runtime
  • Load results should be inspectable via the returned LoadInfo object

Code Reference

Source Location: cohorts/2025/workshops/dynamic_load_dlt.py, lines 110-127

Signature:

pipeline = dlt.pipeline(
    pipeline_name="test_taxi",
    dataset_name=input("Enter the dataset name: "),
    destination="bigquery"
)

info = pipeline.run(parquet_source())   # Path 1
info = pipeline.run(paginated_getter()) # Path 2

Import:

import dlt

I/O Contract

Inputs:

Parameter Type Required Description
pipeline_name str Yes Unique name for the pipeline, used for local state persistence (e.g., "test_taxi")
dataset_name str Yes BigQuery dataset name where tables will be created (collected via user input)
destination str Yes Target destination identifier; "bigquery" in this implementation
resource dlt.Resource Yes A dlt resource generator (parquet_source or paginated_getter) that yields data

Outputs:

Output Type Description
info LoadInfo Object containing load metadata: number of loaded rows, table names created, schema version, load duration, and any warnings or errors
BigQuery tables (side effect) BigQuery dataset Tables created or replaced in the specified BigQuery dataset with auto-inferred schema
Local state (side effect) Pipeline state files Pipeline state persisted locally under ~/.dlt/pipelines/test_taxi/ for future runs

Usage Examples

Basic pipeline creation and execution:

import dlt

# Create the pipeline targeting BigQuery
pipeline = dlt.pipeline(
    pipeline_name="test_taxi",
    dataset_name="ny_taxi_2021",
    destination="bigquery"
)

# Run with a dlt resource (assumes parquet_source or paginated_getter is defined)
info = pipeline.run(parquet_source())
print(info)

Inspecting the LoadInfo result:

import dlt

pipeline = dlt.pipeline(
    pipeline_name="test_taxi",
    dataset_name="ny_taxi_2021",
    destination="bigquery"
)

info = pipeline.run(paginated_getter())

# Print load summary
print(info)

# Access detailed load metrics
print(f"Pipeline name: {pipeline.pipeline_name}")
print(f"Dataset: {pipeline.dataset_name}")
print(f"Destination: {pipeline.destination}")

Branching execution based on selected loading method:

import dlt

pipeline = dlt.pipeline(
    pipeline_name="test_taxi",
    dataset_name="ny_taxi_data",
    destination="bigquery"
)

dlt_method = input("Choose loading method: 1 for GCS -> Bigquery, 2 for Direct Web -> Bigquery: ")

if dlt_method == "1":
    info = pipeline.run(parquet_source())
elif dlt_method == "2":
    info = pipeline.run(paginated_getter())
else:
    print("Invalid selection")
    exit()

print(info)

Using a different destination for local testing:

import dlt

# Switch to DuckDB for local development
pipeline = dlt.pipeline(
    pipeline_name="test_taxi_local",
    dataset_name="ny_taxi_dev",
    destination="duckdb"
)

info = pipeline.run(paginated_getter())
print(info)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment