共计 3623 个字符,预计需要花费 10 分钟才能阅读完成。
Union 的操作对象是具有相同 schema 的 DataFrame,把多个数据对象合并到一个里面去。
Note: 合并的过程中并不会去重,如果要去重可以参考之前的去重方法。
首先,创建数据
import spark.implicits._
val simpleData = Seq(("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
df.printSchema()
df.show()
数据如下所示:
root
|-- employee_name: string (nullable = true)
|-- department: string (nullable = true)
|-- state: string (nullable = true)
|-- salary: integer (nullable = false)
|-- age: integer (nullable = false)
|-- bonus: integer (nullable = false)
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
| James| Sales| NY| 90000| 34|10000|
| Michael| Sales| NY| 86000| 56|20000|
| Robert| Sales| CA| 81000| 30|23000|
| Maria| Finance| CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+
常见具有相同shema的对象
val simpleData2 = Seq(("James","Sales","NY",90000,34,10000),
("Maria","Finance","CA",90000,24,23000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
)
val df2 = simpleData2.toDF("employee_name","department","state","salary","age","bonus")
输出如下:
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James |Sales |NY |90000 |34 |10000|
|Maria |Finance |CA |90000 |24 |23000|
|Jen |Finance |NY |79000 |53 |15000|
|Jeff |Marketing |CA |80000 |25 |18000|
|Kumar |Marketing |NY |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
合并
操作很简单,直接调用下union函数就好了。
val df3 = df.union(df2)
df3.show(false)
结果如下:
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James |Sales |NY |90000 |34 |10000|
|Michael |Sales |NY |86000 |56 |20000|
|Robert |Sales |CA |81000 |30 |23000|
|Maria |Finance |CA |90000 |24 |23000|
|James |Sales |NY |90000 |34 |10000|
|Maria |Finance |CA |90000 |24 |23000|
|Jen |Finance |NY |79000 |53 |15000|
|Jeff |Marketing |CA |80000 |25 |18000|
|Kumar |Marketing |NY |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
合并后去重
简单的就是使用distinct
,如果是指定列去重那么需要duplicate
val df5 = df.union(df2).distinct()
df5.show(false)
输出结果如下:
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James |Sales |NY |90000 |34 |10000|
|Maria |Finance |CA |90000 |24 |23000|
|Jeff |Marketing |CA |80000 |25 |18000|
|Jen |Finance |NY |79000 |53 |15000|
|Kumar |Marketing |NY |91000 |50 |21000|
|Michael |Sales |NY |86000 |56 |20000|
|Robert |Sales |CA |81000 |30 |23000|
+-------------+----------+-----+------+---+-----+
完整例子
import org.apache.spark.sql.SparkSession
object UnionExample extends App{
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
df.printSchema()
df.show()
val simpleData2 = Seq(("James","Sales","NY",90000,34,10000),
("Maria","Finance","CA",90000,24,23000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
)
val df2 = simpleData2.toDF("employee_name","department","state","salary","age","bonus")
df2.show(false)
val df3 = df.union(df2)
df3.show(false)
df3.distinct().show(false)
val df4 = df.unionAll(df2)
df4.show(false)
}
正文完
请博主喝杯咖啡吧!