Implementation:DataTalksClub Data engineering zoomcamp Dlt Pipeline Run
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, dlt docs: dlt Documentation |
| Domains | Data_Engineering, Data_Ingestion |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for configuring a dlt pipeline targeting BigQuery and executing it with a data resource to perform automated schema inference, normalization, and loading of NYC taxi trip data.
Description
This implementation creates a dlt pipeline instance using dlt.pipeline() with three configuration parameters: a pipeline name for state tracking, a user-specified dataset name for the BigQuery destination, and "bigquery" as the destination type. The pipeline is then executed via pipeline.run(), which accepts either the parquet_source() or paginated_getter() dlt resource depending on the loading method selected earlier.
This is a Wrapper Doc implementation. The dlt.pipeline() and pipeline.run() calls wrap the dlt framework's internal extract-normalize-load workflow.
When pipeline.run() is called, the framework:
- Extracts data by consuming the generator function (the dlt resource)
- Normalizes the extracted data into a relational schema suitable for BigQuery
- Loads the normalized data into the specified BigQuery dataset
- Returns a
LoadInfoobject containing metadata about the load (record counts, table names, schema changes, load duration)
The pipeline name ("test_taxi") is used to persist pipeline state locally. On subsequent runs, dlt uses this state to detect schema changes and manage incremental loads (though this specific implementation uses write_disposition="replace", which replaces data entirely).
The dataset name is collected via input(), allowing the operator to target different BigQuery datasets without code changes.
Usage
Use this implementation when:
- Loading NYC taxi trip data (or similar structured data) into BigQuery via dlt
- The pipeline needs automatic schema inference from Parquet data
- The operator should be able to specify the target dataset at runtime
- Load results should be inspectable via the returned
LoadInfoobject
Code Reference
Source Location: cohorts/2025/workshops/dynamic_load_dlt.py, lines 110-127
Signature:
pipeline = dlt.pipeline(
pipeline_name="test_taxi",
dataset_name=input("Enter the dataset name: "),
destination="bigquery"
)
info = pipeline.run(parquet_source()) # Path 1
info = pipeline.run(paginated_getter()) # Path 2
Import:
import dlt
I/O Contract
Inputs:
| Parameter | Type | Required | Description |
|---|---|---|---|
| pipeline_name | str | Yes | Unique name for the pipeline, used for local state persistence (e.g., "test_taxi")
|
| dataset_name | str | Yes | BigQuery dataset name where tables will be created (collected via user input) |
| destination | str | Yes | Target destination identifier; "bigquery" in this implementation
|
| resource | dlt.Resource | Yes | A dlt resource generator (parquet_source or paginated_getter) that yields data
|
Outputs:
| Output | Type | Description |
|---|---|---|
| info | LoadInfo | Object containing load metadata: number of loaded rows, table names created, schema version, load duration, and any warnings or errors |
| BigQuery tables (side effect) | BigQuery dataset | Tables created or replaced in the specified BigQuery dataset with auto-inferred schema |
| Local state (side effect) | Pipeline state files | Pipeline state persisted locally under ~/.dlt/pipelines/test_taxi/ for future runs
|
Usage Examples
Basic pipeline creation and execution:
import dlt
# Create the pipeline targeting BigQuery
pipeline = dlt.pipeline(
pipeline_name="test_taxi",
dataset_name="ny_taxi_2021",
destination="bigquery"
)
# Run with a dlt resource (assumes parquet_source or paginated_getter is defined)
info = pipeline.run(parquet_source())
print(info)
Inspecting the LoadInfo result:
import dlt
pipeline = dlt.pipeline(
pipeline_name="test_taxi",
dataset_name="ny_taxi_2021",
destination="bigquery"
)
info = pipeline.run(paginated_getter())
# Print load summary
print(info)
# Access detailed load metrics
print(f"Pipeline name: {pipeline.pipeline_name}")
print(f"Dataset: {pipeline.dataset_name}")
print(f"Destination: {pipeline.destination}")
Branching execution based on selected loading method:
import dlt
pipeline = dlt.pipeline(
pipeline_name="test_taxi",
dataset_name="ny_taxi_data",
destination="bigquery"
)
dlt_method = input("Choose loading method: 1 for GCS -> Bigquery, 2 for Direct Web -> Bigquery: ")
if dlt_method == "1":
info = pipeline.run(parquet_source())
elif dlt_method == "2":
info = pipeline.run(paginated_getter())
else:
print("Invalid selection")
exit()
print(info)
Using a different destination for local testing:
import dlt
# Switch to DuckDB for local development
pipeline = dlt.pipeline(
pipeline_name="test_taxi_local",
dataset_name="ny_taxi_dev",
destination="duckdb"
)
info = pipeline.run(paginated_getter())
print(info)