Skip to main content

spark SQL 分组统计

数据准备

import spark.implicits._
val simpleData = Seq(("James", "Sales", "NY", 90000, 34, 10000),
("Michael", "Sales", "NY", 86000, 56, 20000),
("Robert", "Sales", "CA", 81000, 30, 23000),
("Maria", "Finance", "CA", 90000, 24, 23000),
("Raman", "Finance", "CA", 99000, 40, 24000),
("Scott", "Finance", "NY", 83000, 36, 19000),
("Jen", "Finance", "NY", 79000, 53, 15000),
("Jeff", "Marketing", "CA", 80000, 25, 18000),
("Kumar", "Marketing", "NY", 91000, 50, 21000)
)
val df = simpleData.toDF("employee_name", "department", "state", "salary", "age", "bonus")
df.show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
| James| Sales| NY| 90000| 34|10000|
| Michael| Sales| NY| 86000| 56|20000|
| Robert| Sales| CA| 81000| 30|23000|
| Maria| Finance| CA| 90000| 24|23000|
| Raman| Finance| CA| 99000| 40|24000|
| Scott| Finance| NY| 83000| 36|19000|
| Jen| Finance| NY| 79000| 53|15000|
| Jeff| Marketing| CA| 80000| 25|18000|
| Kumar| Marketing| NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

分组聚合

df.groupBy("department").sum("salary").show(false)
+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales |257000 |
|Finance |351000 |
|Marketing |171000 |
+----------+-----------+
df.groupBy("department").count().show(false)
+----------+-----+
|department|count|
+----------+-----+
|Sales |3 |
|Finance |4 |
|Marketing |2 |
+----------+-----+
df.groupBy("department").min("salary").show(false)
+----------+-----------+
|department|min(salary)|
+----------+-----------+
|Sales |81000 |
|Finance |79000 |
|Marketing |80000 |
+----------+-----------+
df.groupBy("department").max("salary").show(false)
+----------+-----------+
|department|max(salary)|
+----------+-----------+
|Sales |90000 |
|Finance |99000 |
|Marketing |91000 |
+----------+-----------+
df.groupBy("department").avg("salary").show(false)
+----------+-----------------+
|department|avg(salary) |
+----------+-----------------+
|Sales |85666.66666666667|
|Finance |87750.0 |
|Marketing |85500.0 |
+----------+-----------------+
df.groupBy("department").mean("salary").show(false)
+----------+-----------------+
|department|avg(salary) |
+----------+-----------------+
|Sales |85666.66666666667|
|Finance |87750.0 |
|Marketing |85500.0 |
+----------+-----------------+

根据多列分组聚合

//GroupBy on multiple columns
df.groupBy("department", "state")
.sum("salary", "bonus")
.show(false)
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Finance |NY |162000 |34000 |
|Marketing |NY |91000 |21000 |
|Sales |CA |81000 |23000 |
|Marketing |CA |80000 |18000 |
|Finance |CA |189000 |47000 |
|Sales |NY |176000 |30000 |
+----------+-----+-----------+----------+

一次运行多个聚合函数

import org.apache.spark.sql.functions._
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
max("bonus").as("max_bonus"))
.show(false)
+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales |257000 |85666.66666666667|53000 |23000 |
|Finance |351000 |87750.0 |81000 |24000 |
|Marketing |171000 |85500.0 |39000 |21000 |
+----------+----------+-----------------+---------+---------+

对聚合结果进行过滤

import org.apache.spark.sql.functions._
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
max("bonus").as("max_bonus"))
.where(col("sum_bonus") >= 50000)
.show(false)
+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales |257000 |85666.66666666667|53000 |23000 |
|Finance |351000 |87750.0 |81000 |24000 |
+----------+----------+-----------------+---------+---------+