Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes DataFrame Transformation API

From Leeroopedia


Knowledge Sources
Domains Data_Analysis, Big_Data
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for transforming DataFrames using projection, filtering, column manipulation, grouping, and sorting provided by Apache Spark.

Description

The DataFrame transformation API provides a rich set of methods for reshaping and refining structured data. Each method returns a new DataFrame, enabling fluent method chaining. The BigData-Notes repository demonstrates these operations extensively in the Structured API usage guide, covering column selection, conditional filtering, computed columns, renaming, sorting, and grouping.

Core transformation methods include:

  • select() -- project specific columns or expressions
  • filter() / where() -- retain rows matching a boolean condition
  • withColumn() -- add or replace a column with a computed expression
  • withColumnRenamed() -- rename an existing column
  • drop() -- remove one or more columns
  • groupBy() -- group rows by column values for aggregation
  • sort() / orderBy() -- order rows by column values
  • distinct() -- remove duplicate rows
  • limit() -- restrict output to N rows

Usage

Use these transformation methods to build data processing pipelines. Chain multiple transformations together to express complex logic in a readable, declarative style. Spark's Catalyst optimizer ensures efficient execution regardless of the order in which transformations are written.

Code Reference

Source Location

  • Repository file: notes/Spark_Structured_API的基本使用.md (lines 1-247)
  • External classes: org.apache.spark.sql.DataFrame, org.apache.spark.sql.Column
  • External documentation: DataFrame/Dataset Scaladoc

Signature

// Projection
df.select(col: Column*): DataFrame
df.select(colNames: String*): DataFrame

// Filtering
df.filter(condition: Column): DataFrame
df.where(condition: Column): DataFrame

// Column manipulation
df.withColumn(colName: String, col: Column): DataFrame
df.withColumnRenamed(existingName: String, newName: String): DataFrame
df.drop(colName: String): DataFrame
df.drop(colNames: String*): DataFrame

// Sorting
df.sort(sortExprs: Column*): DataFrame
df.orderBy(sortExprs: Column*): DataFrame

// Grouping (returns GroupedData for aggregation)
df.groupBy(cols: Column*): RelationalGroupedDataset

// Deduplication and limiting
df.distinct(): DataFrame
df.dropDuplicates(colNames: Seq[String]): DataFrame
df.limit(n: Int): DataFrame

Import

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column

I/O Contract

Inputs

Name Type Required Description
df DataFrame Yes The source DataFrame to transform
cols / colNames Column* or String* Varies Column references for select, groupBy, sort, drop
condition Column (Boolean) Yes (filter) Boolean expression that determines which rows to keep
colName String Yes (withColumn) Name of the column to add or replace
col Column Yes (withColumn) Expression defining the new column's values
n Int Yes (limit) Maximum number of rows to return

Outputs

Name Type Description
DataFrame org.apache.spark.sql.DataFrame A new DataFrame with the transformation applied (lazy -- no computation until an action is called)
RelationalGroupedDataset org.apache.spark.sql.RelationalGroupedDataset Intermediate grouped result (from groupBy) awaiting an aggregation call

Usage Examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("Transformation-Examples")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

// Sample DataFrame
val empDF = Seq(
  (1, "Alice", "Engineering", 90000),
  (2, "Bob", "Marketing", 75000),
  (3, "Carol", "Engineering", 95000),
  (4, "Dave", "Marketing", 70000),
  (5, "Eve", "Engineering", 88000)
).toDF("id", "name", "department", "salary")

// --- select: project columns ---
empDF.select("name", "salary").show()

// --- select with expressions ---
empDF.select($"name", ($"salary" * 1.1).as("raised_salary")).show()

// --- filter: keep rows where salary > 80000 ---
empDF.filter($"salary" > 80000).show()

// --- withColumn: add a bonus column ---
val withBonus = empDF.withColumn("bonus", $"salary" * 0.15)
withBonus.show()

// --- withColumnRenamed: rename a column ---
val renamed = empDF.withColumnRenamed("department", "dept")
renamed.show()

// --- drop: remove a column ---
empDF.drop("id").show()

// --- sort: order by salary descending ---
empDF.sort($"salary".desc).show()

// --- groupBy + count ---
empDF.groupBy("department").count().show()

// --- distinct ---
empDF.select("department").distinct().show()

// --- limit ---
empDF.limit(3).show()

// --- chained transformations ---
empDF
  .filter($"department" === "Engineering")
  .withColumn("tax", $"salary" * 0.3)
  .select("name", "salary", "tax")
  .sort($"salary".desc)
  .show()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment