共计 5497 个字符,预计需要花费 14 分钟才能阅读完成。
groupBy
对指定字段相同的数据进行分组处理,是一个聚合操作。
语法:
groupBy(col1 : scala.Predef.String, cols : scala.Predef.String*) :
org.apache.spark.sql.RelationalGroupedDataset
调用 groupBy()
返回的是 RelationalGroupedDataset
对象,这个对象包含常见 agg 操作函数。
count() -
计数
mean() -
求均值
max() -
球最大值
min() -
求最小值
sum() -
求和
avg()
-平均
agg() -
使用这个函数,我们可以进行多个汇总计算
pivot() -
这个函数执行列转换
构建 Data & DataFrame
import spark.implicits._
val simpleData = Seq(("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
df.show()
输出
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
| James| Sales| NY| 90000| 34|10000|
| Michael| Sales| NY| 86000| 56|20000|
| Robert| Sales| CA| 81000| 30|23000|
| Maria| Finance| CA| 90000| 24|23000|
| Raman| Finance| CA| 99000| 40|24000|
| Scott| Finance| NY| 83000| 36|19000|
| Jen| Finance| NY| 79000| 53|15000|
| Jeff| Marketing| CA| 80000| 25|18000|
| Kumar| Marketing| NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
聚合操作
在 department
列上执行聚合操作,计算每一个 department
的薪水之和
df.groupBy("department").sum("salary").show(false)
+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales |257000 |
|Finance |351000 |
|Marketing |171000 |
+----------+-----------+
类似我们要计算个数可以使用 count()
df.groupBy("department").count()
计算最小值 min()
df.groupBy("department").min("salary")
计算最大值max()
df.groupBy("department").max("salary")
计算平均数 avg()
df.groupBy("department").avg( "salary")
计算均值 mean()
df.groupBy("department").mean( "salary")
分组执行多个聚合操作
简而言之,就是对某些列进行多次聚合,比我我想知道薪水的总和,平均数,或者其他列的和之类的数据。下面的例子比较简单,使用一个聚合函数在多列上面操作。
//GroupBy on multiple columns
df.groupBy("department","state")
.sum("salary","bonus")
.show(false)
This yields the below output.
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Finance |NY |162000 |34000 |
|Marketing |NY |91000 |21000 |
|Sales |CA |81000 |23000 |
|Marketing |CA |80000 |18000 |
|Finance |CA |189000 |47000 |
|Sales |NY |176000 |30000 |
+----------+-----+-----------+----------+
类似,我们想要在多个列上执行多种聚合操作就得使用agg
agg 使用
使用多个聚合函数前,先导入相关的包 "import org.apache.spark.sql.functions._"
agg 函数在平时的统计中会经常用到,执行聚合操作之后还可以重命名,不然spark默认的输出列名有点难看。
import org.apache.spark.sql.functions._
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
max("bonus").as("max_bonus"))
.show(false)
上面的代码分别调用了 sum ,avg 和 max
+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales |257000 |85666.66666666667|53000 |23000 |
|Finance |351000 |87750.0 |81000 |24000 |
|Marketing |171000 |85500.0 |39000 |21000 |
+----------+----------+-----------------+---------+---------+
filter 过滤
对于 DataFrame 我们可以使用 where 或者 filter
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
max("bonus").as("max_bonus"))
.where(col("sum_bonus") >= 50000)
.show(false)
过滤出 sum_bonus>50000的数据
+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales |257000 |85666.66666666667|53000 |23000 |
|Finance |351000 |87750.0 |81000 |24000 |
+----------+----------+-----------------+---------+---------+
全部代码
package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object GroupbyExample extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
df.show()
//Group By on single column
df.groupBy("department").count().show(false)
df.groupBy("department").avg("salary").show(false)
df.groupBy("department").sum("salary").show(false)
df.groupBy("department").min("salary").show(false)
df.groupBy("department").max("salary").show(false)
df.groupBy("department").mean("salary").show(false)
//GroupBy on multiple columns
df.groupBy("department","state")
.sum("salary","bonus")
.show(false)
df.groupBy("department","state")
.avg("salary","bonus")
.show(false)
df.groupBy("department","state")
.max("salary","bonus")
.show(false)
df.groupBy("department","state")
.min("salary","bonus")
.show(false)
df.groupBy("department","state")
.mean("salary","bonus")
.show(false)
//Running Filter
df.groupBy("department","state")
.sum("salary","bonus")
.show(false)
//using agg function
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
max("bonus").as("max_bonus"))
.show(false)
df.groupBy("department")
.agg(
sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"),
sum("bonus").as("sum_bonus"),
stddev("bonus").as("stddev_bonus"))
.where(col("sum_bonus") > 50000)
.show(false)
}