Implementation:Heibaiying BigData Notes DataFrame Transformation API
| Knowledge Sources | |
|---|---|
| Domains | Data_Analysis, Big_Data |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for transforming DataFrames using projection, filtering, column manipulation, grouping, and sorting provided by Apache Spark.
Description
The DataFrame transformation API provides a rich set of methods for reshaping and refining structured data. Each method returns a new DataFrame, enabling fluent method chaining. The BigData-Notes repository demonstrates these operations extensively in the Structured API usage guide, covering column selection, conditional filtering, computed columns, renaming, sorting, and grouping.
Core transformation methods include:
- select() -- project specific columns or expressions
- filter() / where() -- retain rows matching a boolean condition
- withColumn() -- add or replace a column with a computed expression
- withColumnRenamed() -- rename an existing column
- drop() -- remove one or more columns
- groupBy() -- group rows by column values for aggregation
- sort() / orderBy() -- order rows by column values
- distinct() -- remove duplicate rows
- limit() -- restrict output to N rows
Usage
Use these transformation methods to build data processing pipelines. Chain multiple transformations together to express complex logic in a readable, declarative style. Spark's Catalyst optimizer ensures efficient execution regardless of the order in which transformations are written.
Code Reference
Source Location
- Repository file:
notes/Spark_Structured_API的基本使用.md(lines 1-247) - External classes:
org.apache.spark.sql.DataFrame,org.apache.spark.sql.Column - External documentation: DataFrame/Dataset Scaladoc
Signature
// Projection
df.select(col: Column*): DataFrame
df.select(colNames: String*): DataFrame
// Filtering
df.filter(condition: Column): DataFrame
df.where(condition: Column): DataFrame
// Column manipulation
df.withColumn(colName: String, col: Column): DataFrame
df.withColumnRenamed(existingName: String, newName: String): DataFrame
df.drop(colName: String): DataFrame
df.drop(colNames: String*): DataFrame
// Sorting
df.sort(sortExprs: Column*): DataFrame
df.orderBy(sortExprs: Column*): DataFrame
// Grouping (returns GroupedData for aggregation)
df.groupBy(cols: Column*): RelationalGroupedDataset
// Deduplication and limiting
df.distinct(): DataFrame
df.dropDuplicates(colNames: Seq[String]): DataFrame
df.limit(n: Int): DataFrame
Import
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| df | DataFrame | Yes | The source DataFrame to transform |
| cols / colNames | Column* or String* | Varies | Column references for select, groupBy, sort, drop |
| condition | Column (Boolean) | Yes (filter) | Boolean expression that determines which rows to keep |
| colName | String | Yes (withColumn) | Name of the column to add or replace |
| col | Column | Yes (withColumn) | Expression defining the new column's values |
| n | Int | Yes (limit) | Maximum number of rows to return |
Outputs
| Name | Type | Description |
|---|---|---|
| DataFrame | org.apache.spark.sql.DataFrame | A new DataFrame with the transformation applied (lazy -- no computation until an action is called) |
| RelationalGroupedDataset | org.apache.spark.sql.RelationalGroupedDataset | Intermediate grouped result (from groupBy) awaiting an aggregation call |
Usage Examples
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Transformation-Examples")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Sample DataFrame
val empDF = Seq(
(1, "Alice", "Engineering", 90000),
(2, "Bob", "Marketing", 75000),
(3, "Carol", "Engineering", 95000),
(4, "Dave", "Marketing", 70000),
(5, "Eve", "Engineering", 88000)
).toDF("id", "name", "department", "salary")
// --- select: project columns ---
empDF.select("name", "salary").show()
// --- select with expressions ---
empDF.select($"name", ($"salary" * 1.1).as("raised_salary")).show()
// --- filter: keep rows where salary > 80000 ---
empDF.filter($"salary" > 80000).show()
// --- withColumn: add a bonus column ---
val withBonus = empDF.withColumn("bonus", $"salary" * 0.15)
withBonus.show()
// --- withColumnRenamed: rename a column ---
val renamed = empDF.withColumnRenamed("department", "dept")
renamed.show()
// --- drop: remove a column ---
empDF.drop("id").show()
// --- sort: order by salary descending ---
empDF.sort($"salary".desc).show()
// --- groupBy + count ---
empDF.groupBy("department").count().show()
// --- distinct ---
empDF.select("department").distinct().show()
// --- limit ---
empDF.limit(3).show()
// --- chained transformations ---
empDF
.filter($"department" === "Engineering")
.withColumn("tax", $"salary" * 0.3)
.select("name", "salary", "tax")
.sort($"salary".desc)
.show()