SparkSql系列(8/25) 行列旋转

2,537次阅读
没有评论

共计 2087 个字符,预计需要花费 6 分钟才能阅读完成。

pivot 函数的功能是实现行列旋转,在 sql 里面也经常会遇到行列旋转,相应的操作大概就是对于行你可能需要解包拆成多列,多列变成一行

构建 DataFrame

val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"),
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico"))

import spark.sqlContext.implicits._
val df = data.toDF("Product","Amount","Country")
df.show()

df 包含三列 产品 、数量和国家

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
| Banana|  1000|    USA|
|Carrots|  1500|    USA|
|  Beans|  1600|    USA|
| Orange|  2000|    USA|
| Orange|  2000|    USA|
| Banana|   400|  China|
|Carrots|  1200|  China|
|  Beans|  1500|  China|
| Orange|  4000|  China|
| Banana|  2000| Canada|
|Carrots|  2000| Canada|
|  Beans|  2000| Mexico|
+-------+-----+-------+

Pivot

pivot 函数旋转将行变成列,执行的是一个聚合的操作。比如上面的例子中,你现在想要统计每个产品在各个国家的数量,实际上是要将国家这个信息由行拓展到列上,这样每行就是统计 每一个产品在各个国家的存储量。

执行的操作如下:

  1. 分组:按照产品分组处理
  2. 选择需要行转列的对应的
  3. 执行的聚合操作:这里是sum
val pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.show()

在行转列的过程中,如果出现没有的数据,那么对应的会被置为 null ,可以使用 null 进行处理填充 0 操作

+-------+------+-----+------+----+
|Product|Canada|China|Mexico| USA|
+-------+------+-----+------+----+
| Orange|  null| 4000|  null|4000|
|  Beans|  null| 1500|  2000|1600|
| Banana|  2000|  400|  null|1000|
|Carrots|  2000| 1200|  null|1500|

Spark 2.0 中函数性能提升

在 1.x 版本执行这个操作算是一个消耗巨大的操作,在 2.0 做出了性能的优化,具体执行的方法可以参考下面的代码实现

val countries = Seq("USA","China","Canada","Mexico")
val pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show()

依据 Spark-13749 执行两步聚合达到同样的效果,有进一步的性能优化

val pivotDF = df.groupBy("Product","Country")
      .sum("Amount")
      .groupBy("Product")
      .pivot("Country")
      .sum("sum(Amount)")
pivotDF.show()

Unpivot

Unpivot 是一个逆操作,原来是将行转列,现在就是要实现列转行,类似一个stack操作

//unpivot
val unPivotDF = pivotDF.select($"Product",
expr("stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"))
.where("Total is not null")
unPivotDF.show()
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
| Orange|  China| 4000|
|  Beans|  China| 1500|
|  Beans| Mexico| 2000|
| Banana| Canada| 2000|
| Banana|  China|  400|
|Carrots| Canada| 2000|
|Carrots|  China| 1200|
+-------+-------+-----+
正文完
请博主喝杯咖啡吧!
post-qrcode
 
admin
版权声明:本站原创文章,由 admin 2021-08-26发表,共计2087字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码