共计 10121 个字符,预计需要花费 26 分钟才能阅读完成。
Spark DataFrame 支持多种Join的方式,基本上你能想到的都是支持的,比如 INNER
, LEFT OUTER
, RIGHT OUTER
, LEFT ANTI
, LEFT SEMI
, CROSS
, SELF
- Join Syntax & Types
- Inner Join
- Full Outer Join
- Left Outer Join
- Right Outer Join
- Left Anti Join
- Left Semi Join
- Self Join
- Using SQL Expression
1. SQL Join Types & Syntax
以下是常见的join方法的语法
1) join(right: Dataset[_]): DataFrame 2) join(right: Dataset[_], usingColumn: String): DataFrame 3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame 4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame 5) join(right: Dataset[_], joinExprs: Column): DataFrame 6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
JOIN类型 | JOIN STRING | 等同于 SQL JOIN |
---|---|---|
Inner.sql | inner | INNER JOIN |
FullOuter.sql | outer, full, fullouter, full_outer | FULL OUTER JOIN |
LeftOuter.sql | left, leftouter, left_outer | LEFT JOIN |
RightOuter.sql | right, rightouter, right_outer | RIGHT JOIN |
Cross.sql | cross | |
LeftAnti.sql | anti, leftanti, left_anti | |
LeftSemi.sql | semi, leftsemi, left_semi |
所有的Join都是定义在 joinTypes 包中,所以想要使用必须先引入:
org.apache.spark.sql.catalyst.plans.{LeftOuter,Inner,....}
.
首先构建两个DataFrame emp
and dept
,emp_dept_id 和 dept_id 是表示相同含义的列。
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined",
"emp_dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
打印 DataFrame 如下所示:
Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+
2. Inner Join 内联
Spark Inner
join 是默认的 join 方式,只有满足 join条件的数据才会被保留下来。
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
结果如下所示:第一个表的 50 对应的行数据丢了,第二个表 30 那行数据丢了
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
3. Full Outer Join全连接
Outer
a.k.a full
, fullouter
join returns all rows from both Spark DataFrame/Datasets, where join expression doesn’t match it returns null on respective record columns.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter")
.show(false)
实验的结果如下:
出现了 null 列,与 inner join 对比,之前那两行都在,也就是说 全连接会保留所有的行数据。举个例子,表1中
emp_dept_id=50其实在表2没有找到对应的数据,那么表2中的列数据都会被置为 null ,以此类推。
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|6 |Brown |2 |2010 |50 | |-1 |null |null |
|null |null |null |null |null |null |null |Sales |30 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
4. Left Outer Join 左联结
相比于全联结,左联结就要保留左边的表所有的数据,假设你左边的表有1000条数据,做完左联结之后结果也是1000条。
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter")
.show(false)
实验结果如下:
发现表1中的 50 那行数据还在
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
|6 |Brown |2 |2010 |50 | |-1 |null |null |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
5. Right Outer Join 右联结
对比左联结,以右边的表为准。
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter")
.show(false)
实验结果如下:
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|null |null |null |null |null |null |null |Sales |30 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
6. Left Semi Join 左半连接
这个连接其实跟 inner join 很像,但是不同点在于返回的列不一样,inner join 会返回两个表的所有列,但是这个方法只会返回左边的表的列。
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi")
.show(false)
实验结果如下:
leftsemi join
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
7. Left Anti Join
Left Anti
实现的是与 left semi join 完全相反的过程。本来join找相同的数据,这个确实返回不存在的那条数据。
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti")
.show(false)
输出结果如下:
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6 |Brown|2 |2010 |50 | |-1 |
+------+-----+---------------+-----------+-----------+------+------+
8. Self Join 自联结
就是自己跟自己 join 。
empDF.as("emp1").join(empDF.as("emp2"),
col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
.select(col("emp1.emp_id"),col("emp1.name"),
col("emp2.emp_id").as("superior_emp_id"),
col("emp2.name").as("superior_emp_name"))
.show(false)
输出结果如下:
+------+--------+---------------+-----------------+
|emp_id|name |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2 |Rose |1 |Smith |
|3 |Williams|1 |Smith |
|4 |Jones |2 |Rose |
|5 |Brown |2 |Rose |
|6 |Brown |2 |Rose |
+------+--------+---------------+-----------------+
9. 使用 SQL 表达式
使用sql表达式之前,需要构建临时表。
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
joinDF.show(false)
val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
joinDF2.show(false)
10. 全部代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object JoinExample extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
println("Inner join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
println("Outer join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer")
.show(false)
println("full join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full")
.show(false)
println("fullouter join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter")
.show(false)
println("right join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right")
.show(false)
println("rightouter join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter")
.show(false)
println("left join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left")
.show(false)
println("leftouter join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter")
.show(false)
println("leftanti join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti")
.show(false)
println("leftsemi join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi")
.show(false)
println("cross join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"cross")
.show(false)
println("Using crossJoin()")
empDF.crossJoin(deptDF).show(false)
println("self join")
empDF.as("emp1").join(empDF.as("emp2"),
col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
.select(col("emp1.emp_id"),col("emp1.name"),
col("emp2.emp_id").as("superior_emp_id"),
col("emp2.name").as("superior_emp_name"))
.show(false)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
joinDF.show(false)
val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
joinDF2.show(false)
}
Examples explained here are available at the GitHub project for reference.