Implementation:Heibaiying BigData Notes Spark Agg and Join API
| Knowledge Sources | |
|---|---|
| Domains | Data_Analysis, Big_Data |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for performing aggregations and joins on structured data provided by Apache Spark.
Description
The aggregation and join APIs in Spark SQL provide methods for computing summary statistics over groups and combining DataFrames on key relationships. The BigData-Notes repository documents aggregation functions in the dedicated aggregation guide and join operations in the join operations guide, using emp (employees) and dept (departments) sample DataFrames throughout.
Aggregation API: After calling groupBy(), the resulting RelationalGroupedDataset exposes the agg() method, which accepts one or more aggregate expressions built from functions in org.apache.spark.sql.functions.
Join API: The join() method on DataFrame accepts another DataFrame, a join condition (Column expression), and an optional join type string. Spark supports inner, outer, left_outer, right_outer, left_semi, left_anti, and cross joins.
Usage
Use the aggregation API to compute counts, sums, averages, minimums, maximums, distinct counts, and collection operations grouped by one or more key columns. Use the join API to combine related DataFrames (e.g., enriching employee records with department names). The two operations are often combined in analytical queries.
Code Reference
Source Location
- Aggregation source:
notes/SparkSQL常用聚合函数.md(lines 1-342) - Join source:
notes/SparkSQL联结操作.md(lines 1-188) - External classes:
org.apache.spark.sql.DataFrame,org.apache.spark.sql.RelationalGroupedDataset,org.apache.spark.sql.functions
Signature
// --- Aggregation ---
// Grouped aggregation
df.groupBy(cols: Column*): RelationalGroupedDataset
groupedData.agg(exprs: Column*): DataFrame
// Built-in aggregate functions (from org.apache.spark.sql.functions)
count(col: Column): Column
sum(col: Column): Column
avg(col: Column): Column
min(col: Column): Column
max(col: Column): Column
countDistinct(col: Column, cols: Column*): Column
collect_set(col: Column): Column
collect_list(col: Column): Column
// --- Joins ---
df.join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
df.join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame
df.crossJoin(right: DataFrame): DataFrame
Import
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| df | DataFrame | Yes | The primary DataFrame for aggregation or the left side of a join |
| cols | Column* or String* | Yes (groupBy) | Columns to group by before aggregation |
| exprs | Column* | Yes (agg) | Aggregate expressions (count, sum, avg, etc.) |
| right | DataFrame | Yes (join) | The right-side DataFrame to join with |
| joinExprs | Column | Yes (join) | Boolean column expression specifying the join condition |
| joinType | String | No (default: "inner") | Join type: "inner", "outer", "left_outer", "right_outer", "left_semi", "left_anti", "cross" |
Outputs
| Name | Type | Description |
|---|---|---|
| DataFrame | org.apache.spark.sql.DataFrame | Aggregated or joined result as a new DataFrame |
| RelationalGroupedDataset | org.apache.spark.sql.RelationalGroupedDataset | Intermediate grouped result (from groupBy) awaiting aggregation |
Usage Examples
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Agg-and-Join-Examples")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// --- Sample data (emp and dept) ---
val empDF = Seq(
(1, "Alice", 10, 90000),
(2, "Bob", 20, 75000),
(3, "Carol", 10, 95000),
(4, "Dave", 20, 70000),
(5, "Eve", 30, 88000),
(6, "Frank", 10, 92000)
).toDF("empId", "name", "deptId", "salary")
val deptDF = Seq(
(10, "Engineering"),
(20, "Marketing"),
(30, "Sales"),
(40, "HR")
).toDF("deptId", "deptName")
// ============================================
// AGGREGATION EXAMPLES
// ============================================
// Simple whole-DataFrame aggregation
empDF.agg(
count("*").as("total_employees"),
avg("salary").as("avg_salary")
).show()
// Grouped aggregation by department
empDF.groupBy("deptId").agg(
count("*").as("emp_count"),
sum("salary").as("total_salary"),
avg("salary").as("avg_salary"),
min("salary").as("min_salary"),
max("salary").as("max_salary")
).show()
// countDistinct
empDF.agg(countDistinct("deptId").as("num_departments")).show()
// collect_set and collect_list
empDF.groupBy("deptId").agg(
collect_set("name").as("unique_names"),
collect_list("name").as("all_names")
).show(truncate = false)
// ============================================
// JOIN EXAMPLES
// ============================================
// Inner join
val innerJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "inner")
.select(empDF("name"), deptDF("deptName"), empDF("salary"))
innerJoin.show()
// Left outer join (all employees, even those without matching dept)
val leftJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_outer")
.select(empDF("name"), deptDF("deptName"))
leftJoin.show()
// Right outer join (all departments, even those without employees)
val rightJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "right_outer")
.select(empDF("name"), deptDF("deptName"))
rightJoin.show()
// Full outer join
val fullJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "outer")
.select(empDF("name"), deptDF("deptName"))
fullJoin.show()
// Left semi join (employees whose deptId exists in deptDF)
val semiJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_semi")
semiJoin.show()
// Left anti join (employees whose deptId does NOT exist in deptDF)
val antiJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_anti")
antiJoin.show()
// Cross join (Cartesian product)
val crossJoin = empDF.crossJoin(deptDF)
println(s"Cross join row count: ${crossJoin.count()}")
// ============================================
// COMBINED: Join then Aggregate
// ============================================
empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "inner")
.groupBy(deptDF("deptName"))
.agg(
count("*").as("emp_count"),
avg(empDF("salary")).as("avg_salary")
)
.orderBy($"avg_salary".desc)
.show()