Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes Spark Agg and Join API

From Leeroopedia


Knowledge Sources
Domains Data_Analysis, Big_Data
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for performing aggregations and joins on structured data provided by Apache Spark.

Description

The aggregation and join APIs in Spark SQL provide methods for computing summary statistics over groups and combining DataFrames on key relationships. The BigData-Notes repository documents aggregation functions in the dedicated aggregation guide and join operations in the join operations guide, using emp (employees) and dept (departments) sample DataFrames throughout.

Aggregation API: After calling groupBy(), the resulting RelationalGroupedDataset exposes the agg() method, which accepts one or more aggregate expressions built from functions in org.apache.spark.sql.functions.

Join API: The join() method on DataFrame accepts another DataFrame, a join condition (Column expression), and an optional join type string. Spark supports inner, outer, left_outer, right_outer, left_semi, left_anti, and cross joins.

Usage

Use the aggregation API to compute counts, sums, averages, minimums, maximums, distinct counts, and collection operations grouped by one or more key columns. Use the join API to combine related DataFrames (e.g., enriching employee records with department names). The two operations are often combined in analytical queries.

Code Reference

Source Location

  • Aggregation source: notes/SparkSQL常用聚合函数.md (lines 1-342)
  • Join source: notes/SparkSQL联结操作.md (lines 1-188)
  • External classes: org.apache.spark.sql.DataFrame, org.apache.spark.sql.RelationalGroupedDataset, org.apache.spark.sql.functions

Signature

// --- Aggregation ---
// Grouped aggregation
df.groupBy(cols: Column*): RelationalGroupedDataset
groupedData.agg(exprs: Column*): DataFrame

// Built-in aggregate functions (from org.apache.spark.sql.functions)
count(col: Column): Column
sum(col: Column): Column
avg(col: Column): Column
min(col: Column): Column
max(col: Column): Column
countDistinct(col: Column, cols: Column*): Column
collect_set(col: Column): Column
collect_list(col: Column): Column

// --- Joins ---
df.join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
df.join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame
df.crossJoin(right: DataFrame): DataFrame

Import

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

I/O Contract

Inputs

Name Type Required Description
df DataFrame Yes The primary DataFrame for aggregation or the left side of a join
cols Column* or String* Yes (groupBy) Columns to group by before aggregation
exprs Column* Yes (agg) Aggregate expressions (count, sum, avg, etc.)
right DataFrame Yes (join) The right-side DataFrame to join with
joinExprs Column Yes (join) Boolean column expression specifying the join condition
joinType String No (default: "inner") Join type: "inner", "outer", "left_outer", "right_outer", "left_semi", "left_anti", "cross"

Outputs

Name Type Description
DataFrame org.apache.spark.sql.DataFrame Aggregated or joined result as a new DataFrame
RelationalGroupedDataset org.apache.spark.sql.RelationalGroupedDataset Intermediate grouped result (from groupBy) awaiting aggregation

Usage Examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("Agg-and-Join-Examples")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

// --- Sample data (emp and dept) ---
val empDF = Seq(
  (1, "Alice", 10, 90000),
  (2, "Bob", 20, 75000),
  (3, "Carol", 10, 95000),
  (4, "Dave", 20, 70000),
  (5, "Eve", 30, 88000),
  (6, "Frank", 10, 92000)
).toDF("empId", "name", "deptId", "salary")

val deptDF = Seq(
  (10, "Engineering"),
  (20, "Marketing"),
  (30, "Sales"),
  (40, "HR")
).toDF("deptId", "deptName")

// ============================================
// AGGREGATION EXAMPLES
// ============================================

// Simple whole-DataFrame aggregation
empDF.agg(
  count("*").as("total_employees"),
  avg("salary").as("avg_salary")
).show()

// Grouped aggregation by department
empDF.groupBy("deptId").agg(
  count("*").as("emp_count"),
  sum("salary").as("total_salary"),
  avg("salary").as("avg_salary"),
  min("salary").as("min_salary"),
  max("salary").as("max_salary")
).show()

// countDistinct
empDF.agg(countDistinct("deptId").as("num_departments")).show()

// collect_set and collect_list
empDF.groupBy("deptId").agg(
  collect_set("name").as("unique_names"),
  collect_list("name").as("all_names")
).show(truncate = false)

// ============================================
// JOIN EXAMPLES
// ============================================

// Inner join
val innerJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "inner")
  .select(empDF("name"), deptDF("deptName"), empDF("salary"))
innerJoin.show()

// Left outer join (all employees, even those without matching dept)
val leftJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_outer")
  .select(empDF("name"), deptDF("deptName"))
leftJoin.show()

// Right outer join (all departments, even those without employees)
val rightJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "right_outer")
  .select(empDF("name"), deptDF("deptName"))
rightJoin.show()

// Full outer join
val fullJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "outer")
  .select(empDF("name"), deptDF("deptName"))
fullJoin.show()

// Left semi join (employees whose deptId exists in deptDF)
val semiJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_semi")
semiJoin.show()

// Left anti join (employees whose deptId does NOT exist in deptDF)
val antiJoin = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_anti")
antiJoin.show()

// Cross join (Cartesian product)
val crossJoin = empDF.crossJoin(deptDF)
println(s"Cross join row count: ${crossJoin.count()}")

// ============================================
// COMBINED: Join then Aggregate
// ============================================
empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "inner")
  .groupBy(deptDF("deptName"))
  .agg(
    count("*").as("emp_count"),
    avg(empDF("salary")).as("avg_salary")
  )
  .orderBy($"avg_salary".desc)
  .show()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment