Implementation:Kubeflow Pipelines Taxi Utils
| Knowledge Sources | |
|---|---|
| Domains | Feature_Engineering, Model_Training, TFX |
| Last Updated | 2026-02-13 14:00 GMT |
Overview
TFX pipeline utility module providing preprocessing and training logic for the Chicago Taxi dataset sample, implementing `preprocessing_fn` and `trainer_fn` required by TFX components.
Description
The taxi_utils.py module defines feature engineering for the Chicago Taxi dataset across several feature categories: dense floats (trip_miles, fare, trip_seconds) are z-score normalized, vocabulary features (payment_type, company) use computed vocabularies, bucket features (lat/lon coordinates) are bucketized, and categorical features (time/area fields) are passed through. The label (tips) is binarized to predict whether the tip exceeded 20% of fare. The `_build_estimator` function constructs a `DNNLinearCombinedClassifier` (wide-and-deep model).
Usage
This file is uploaded to GCS and referenced by TFX components (Transform, Trainer, Evaluator) during pipeline execution. It demonstrates production-quality feature engineering patterns with TensorFlow Transform.
Code Reference
Source Location
- Repository: Kubeflow_Pipelines
- File: samples/core/tfx-oss/utils/taxi_utils.py
- Lines: 1-358
Signature
def preprocessing_fn(inputs: dict) -> dict:
"""TFX Transform preprocessing function.
Applies z-score normalization, vocabulary encoding, bucketization,
and binary label derivation (tips > 20% of fare)."""
def trainer_fn(hparams, schema) -> dict:
"""TFX Trainer entry point.
Returns dict with 'estimator', 'train_spec', 'eval_spec',
'eval_input_receiver_fn'."""
def _build_estimator(config, hidden_units=None, warm_start_from=None):
"""Constructs DNNLinearCombinedClassifier with configurable hidden layers."""
def _fill_in_missing(x):
"""Replaces missing values in sparse tensors, returns dense rank-1 tensor."""
Import
# Referenced by TFX components, not directly imported by users
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| inputs | dict[str, SparseTensor] | Yes | Raw feature tensors from TFRecords |
| hparams | HyperParameters | Yes | TFX trainer hyperparameters (data paths, batch size) |
| schema | Schema proto | Yes | TFX data schema definition |
Outputs
| Name | Type | Description |
|---|---|---|
| preprocessing_fn returns | dict[str, Tensor] | Transformed feature tensors |
| trainer_fn returns | dict | estimator, train_spec, eval_spec, eval_input_receiver_fn |
Usage Examples
Feature Categories
# Feature definitions used in preprocessing
_DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']
_VOCAB_FEATURE_KEYS = ['payment_type', 'company']
_BUCKET_FEATURE_KEYS = ['pickup_latitude', 'pickup_longitude',
'dropoff_latitude', 'dropoff_longitude']
_CATEGORICAL_FEATURE_KEYS = ['trip_start_hour', 'trip_start_day',
'trip_start_month', 'pickup_census_tract',
'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area']
_LABEL_KEY = 'tips'
_FARE_KEY = 'fare'