共计 3969 个字符,预计需要花费 10 分钟才能阅读完成。
Spark collect()
和 collectAsList()
是用于将 RDD/DataFrame/Dataset 的所有元素(从所有节点)汇合到Driver节点。 在较小的数据集上可以使用 collect(),但是通常操作在 filter()、group()、count() 等之后。这也是尽可能避免在在Driver节点上出现OOM。如果存在很大的数据就不要执行这个操作了。
val spark:SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
Row(Row("Michael ","Rose",""),"40288","M",4000),
Row(Row("Robert ","","Williams"),"42114","M",4000),
Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
Row(Row("Jen","Mary","Brown"),"","F",-1)
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("id",StringType)
.add("gender",StringType)
.add("salary",IntegerType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.printSchema()
df.show(false)
printSchema
是打印DataFrame 的数据样式
show
从DataFrame中随机挑选部分数据展示
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+---------------------+-----+------+------+
|name |id |gender|salary|
+---------------------+-----+------+------+
|[James , , Smith] |36636|M |3000 |
|[Michael , Rose, ] |40288|M |4000 |
|[Robert , , Williams]|42114|M |4000 |
|[Maria , Anne, Jones]|39192|F |4000 |
|[Jen, Mary, Brown] | |F |-1 |
+---------------------+-----+------+------+
使用collect() 和 collectAsList()
collect()
返回的数据格式是Array[Row]
collectAsList()
返回的格式是 Java.util.list.
语法:
collect() : scala.Array[T]
collectAsList() : java.util.List[T]
collect() 示例
val colList = df.collectAsList()
val colData = df.collect()
colData.foreach(row=>
{
val salary = row.getInt(3)//Index starts from zero
println(salary)
})
deptDF.collect()
retrieves all elements in a DataFrame as an array to the driver. From the array, I’ve retried the firstName element and printed on the console.
3000
4000
4000
4000
-1
解析 Row
从 Row 的结构体中解析数据需要使用到 getStruct()
函数
//Retrieving data from Struct column
colData.foreach(row=>
{
val salary = row.getInt(3)
val fullName:Row = row.getStruct(0) //Index starts from zero
val firstName = fullName.getString(0)//In struct row, again index starts from zero
val middleName = fullName.get(1).toString
val lastName = fullName.getAs[String]("lastname")
println(firstName+","+middleName+","+lastName+","+salary)
})
getInt()
获取一个整形, getString()
获取一个 String 列, getAs[String]()
获取一个String 列表。
James ,,Smith,3000
Michael ,Rose,,4000
Robert ,,Williams,4000
Maria ,Anne,Jones,4000
Jen,Mary,Brown,-1
如果你直接调用 collect
是会返回所有的列数据,如果你只是想要其中某些列的数据,那么你可以先 select
这个列,然后再做上述的操作。
dataCollect = df.select("name").collect()
何时使用 Collect()
在前面也提到,执行这个操作,所有的数据会聚合到 Driver 节点上,数据量稍微大一点可能就会导致 OOM , 一般情况下就是小数据量可以执行这个操作,或者执行一个count聚合操作之后打印一些数据可以,其他情况暂时不要执行这个操作。
collect () vs select ()
select()
是转化操作, collect()
行动 , 解释起来有点别扭。
完整的例子
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object CollectExample extends App {
val spark:SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
Row(Row("Michael ","Rose",""),"40288","M",4000),
Row(Row("Robert ","","Williams"),"42114","M",4000),
Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
Row(Row("Jen","Mary","Brown"),"","F",-1)
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("id",StringType)
.add("gender",StringType)
.add("salary",IntegerType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.printSchema()
df.show(false)
val colData = df.collect()
colData.foreach(row=>
{
val salary = row.getInt(3)//Index starts from zero
println(salary)
})
//Retrieving data from Struct column
colData.foreach(row=>
{
val salary = row.getInt(3)
val fullName:Row = row.getStruct(0) //Index starts from zero
val firstName = fullName.getString(0)//In struct row, again index starts from zero
val middleName = fullName.get(1).toString
val lastName = fullName.getAs[String]("lastname")
println(firstName+","+middleName+","+lastName+","+salary)
})
}