共计 5056 个字符,预计需要花费 13 分钟才能阅读完成。
基本概念
Spark map()
and mapPartitions()
都是 spark 中的transformer
操作,是日常开发中使用比较多的函数。
但是两者之间也存在着差别。
- map() – Spark
map()
执行的是对每一行做相应的转化操作。 - mapPartitions() – 最终的效果是和 map 一样,它是将数据分成多个分区分别执行操作。
那为什么还是会存在两个?
之前在听隔壁组分享spark优化的时候也是会提到这两个函数,在日常的开发中会遇到这样的情况,比如数据写到 Redis里,如果你使用map,那么每一条数据都建立一个redis连接负责写数据,那么在大数据的情况下给redis的压力很大,connections会急速上升,如果这个redis是线上正在使用的说不定会影响线上的业务。此时mappartion就会缓解这个问题,我们可以在每一个partition里对这个分区的数据建立一个connection即可。
这也是spark代码优化的一点吧,后续也会提到foreachpartition
构建数据
val structureData = Seq(
Row("James","","Smith","36636","NewYork",3100),
Row("Michael","Rose","","40288","California",4300),
Row("Robert","","Williams","42114","Florida",1400),
Row("Maria","Anne","Jones","39192","Florida",5500),
Row("Jen","Mary","Brown","34561","NewYork",3000)
)
val structureSchema = new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType)
.add("id",StringType)
.add("location",StringType)
.add("salary",IntegerType)
val df2 = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),structureSchema)
df2.printSchema()
df2.show(false)
Yields below output
root
|-- firstname: string (nullable = true)
|-- middlename: string (nullable = true)
|-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- salary: integer (nullable = true)
+---------+----------+--------+-----+----------+------+
|firstname|middlename|lastname|id |location |salary|
+---------+----------+--------+-----+----------+------+
|James | |Smith |36636|NewYork |3100 |
|Michael |Rose | |40288|California|4300 |
|Robert | |Williams|42114|Florida |1400 |
|Maria |Anne |Jones |39192|Florida |5500 |
|Jen |Mary |Brown |34561|NewYork |3000 |
+---------+----------+--------+-----+----------+------+
构建一个combine函数,函数的功能就是把多个列的组合在一起拼成一个字符串。
class Util extends Serializable {
def combine(fname:String,mname:String,lname:String):String = {
fname+","+mname+","+lname
}
}
Spark map() 操作
由于只是转化操作,所以操作的前后数据量大小一般情况下是不会变的,但是schema可能会变,你可以新增一列。
语法:
1) map[U](func : scala.Function1[T, U])(implicit evidence$6 : org.apache.spark.sql.Encoder[U])
: org.apache.spark.sql.Dataset[U]
2) map[U](func : org.apache.spark.api.java.function.MapFunction[T, U], encoder : org.apache.spark.sql.Encoder[U])
: org.apache.spark.sql.Dataset[U]
对 DataFrame执行map操作之后返回的并不是DataFrame,是 Dataset[Row],如果你想要变成DataFrame那么就得使用toDF的操作。
import spark.implicits._
val df3 = df2.map(row=>{
// This initialization happens to every records
// If it is heavy initilizations like Database connects
// It degrates the performance
val util = new Util()
val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
(fullName, row.getString(3),row.getInt(5))
})
val df3Map = df3.toDF("fullName","id","salary")
df3Map.printSchema()
df3Map.show(false)
输出如下
root
|-- fullName: string (nullable = true)
|-- id: string (nullable = true)
|-- salary: integer (nullable = false)
+----------------+-----+------+
|fullName |id |salary|
+----------------+-----+------+
|James,,Smith |36636|3100 |
|Michael,Rose, |40288|4300 |
|Robert,,Williams|42114|1400 |
|Maria,Anne,Jones|39192|5500 |
|Jen,Mary,Brown |34561|3000 |
+----------------+-----+------+
mapPartitions()
mapPartitions的优势在前面也提到了,也举例了 redis 连接的问题。
语法:
1) mapPartitions[U](func : scala.Function1[scala.Iterator[T], scala.Iterator[U]])(implicit evidence$7 : org.apache.spark.sql.Encoder[U])
: org.apache.spark.sql.Dataset[U]
2) mapPartitions[U](f : org.apache.spark.api.java.function.MapPartitionsFunction[T, U], encoder : org.apache.spark.sql.Encoder[U])
: org.apache.spark.sql.Dataset[U]
val df4 = df2.mapPartitions(iterator => {
// Do the heavy initialization here
// Like database connections e.t.c
val util = new Util()
val res = iterator.map(row=>{
val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
(fullName, row.getString(3),row.getInt(5))
})
res
})
val df4part = df4.toDF("fullName","id","salary")
df4part.printSchema()
df4part.show(false)
完整的代码
import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{IntegerType, StringType, StructType,ArrayType,MapType}object MapTransformation extends App{ val spark:SparkSession = SparkSession.builder() .master("local[5]") .appName("SparkByExamples.com") .getOrCreate() val structureData = Seq( Row("James","","Smith","36636","NewYork",3100), Row("Michael","Rose","","40288","California",4300), Row("Robert","","Williams","42114","Florida",1400), Row("Maria","Anne","Jones","39192","Florida",5500), Row("Jen","Mary","Brown","34561","NewYork",3000) ) val structureSchema = new StructType() .add("firstname",StringType) .add("middlename",StringType) .add("lastname",StringType) .add("id",StringType) .add("location",StringType) .add("salary",IntegerType) val df2 = spark.createDataFrame( spark.sparkContext.parallelize(structureData),structureSchema) df2.printSchema() df2.show(false) import spark.implicits._ val util = new Util() val df3 = df2.map(row=>{ val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2)) (fullName, row.getString(3),row.getInt(5)) }) val df3Map = df3.toDF("fullName","id","salary") df3Map.printSchema() df3Map.show(false) val df4 = df2.mapPartitions(iterator => { val util = new Util() val res = iterator.map(row=>{ val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2)) (fullName, row.getString(3),row.getInt(5)) }) res }) val df4part = df4.toDF("fullName","id","salary") df4part.printSchema() df4part.show(false)}