Principle:Heibaiying BigData Notes Spark Aggregation and Join
| Knowledge Sources | |
|---|---|
| Domains | Data_Analysis, Big_Data |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Aggregations compute summary statistics over groups of rows, and joins combine DataFrames based on key relationships, together forming the core analytical operations in Spark SQL.
Description
Aggregations and joins are the two most fundamental analytical operations in structured data processing. They correspond directly to the GROUP BY and JOIN clauses in standard SQL and are essential for any non-trivial data analysis.
Aggregations
Aggregations reduce multiple rows into summary values. Spark SQL supports:
- Simple aggregations -- applied to the entire DataFrame (e.g.,
df.count(),df.agg(max("salary"))) - Grouped aggregations -- applied per group after a groupBy() (e.g., count per department, average salary per role)
- Multi-column aggregations -- computing several aggregate expressions in a single agg() call
- Window aggregations -- computing aggregates over a sliding or partitioned window without collapsing rows
Common aggregate functions include count(), sum(), avg() (or mean()), min(), max(), countDistinct(), collect_set(), and collect_list().
Joins
Joins combine rows from two DataFrames where a key condition is satisfied. Spark SQL supports all standard join types:
| Join Type | Description | Keeps Unmatched Rows From |
|---|---|---|
| inner | Rows that match in both DataFrames | Neither (only matching rows) |
| outer (full outer) | All rows from both DataFrames | Both sides (nulls for non-matches) |
| left_outer | All rows from the left DataFrame | Left side |
| right_outer | All rows from the right DataFrame | Right side |
| left_semi | Left rows where a match exists in the right | Left side (no right columns included) |
| left_anti | Left rows where no match exists in the right | Left side (no right columns included) |
| cross | Cartesian product of both DataFrames | Both sides (every combination) |
Usage
Aggregations and joins are used in virtually every analytical workflow:
- Reporting and dashboards -- computing KPIs (totals, averages, counts) grouped by dimensions (time, region, category)
- Data enrichment -- joining a fact table with dimension tables to add descriptive attributes
- Deduplication and lookup -- using semi joins to find matching records or anti joins to find missing ones
- Feature engineering -- aggregating historical data per entity for machine learning pipelines
- Data quality checks -- counting nulls, checking referential integrity via joins
Theoretical Basis
Aggregation Mechanics
Grouped aggregation in Spark follows a map-side partial aggregation strategy (similar to combiners in MapReduce):
- Grouping: rows are partitioned by the groupBy key(s) using a hash or sort-based shuffle
- Partial aggregation: each partition computes local partial aggregates before shuffling
- Final aggregation: after shuffle, partial results are merged into final aggregate values
// Pseudocode for grouped aggregation
val result = empDF
.groupBy("department")
.agg(
count("*").as("emp_count"),
avg("salary").as("avg_salary"),
max("salary").as("max_salary"),
countDistinct("role").as("distinct_roles"),
collect_set("name").as("employee_names")
)
Join Mechanics
Spark chooses a physical join strategy based on data size and configuration:
- Broadcast Hash Join -- when one side is small enough to fit in memory (controlled by spark.sql.autoBroadcastJoinThreshold, default 10MB). The small DataFrame is broadcast to all executors.
- Sort-Merge Join -- the default for large-large joins. Both sides are sorted by the join key and merged.
- Shuffle Hash Join -- both sides are hash-partitioned by the join key, then probed.
// Pseudocode for various join types
val inner = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "inner")
val leftOut = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_outer")
val semi = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_semi")
val anti = empDF.join(deptDF, empDF("deptId") === deptDF("deptId"), "left_anti")
val cross = empDF.crossJoin(deptDF)
Performance considerations:
- Broadcast joins are dramatically faster for small-large joins because they eliminate the shuffle on the large side
- Skewed join keys (where one key value has disproportionately many rows) can cause performance degradation; Spark 3.x adaptive query execution (AQE) can mitigate this
- Cross joins produce N x M rows and should be used with extreme caution