共计 1311 个字符,预计需要花费 4 分钟才能阅读完成。
简介
主要介绍两种遍历数据的方法 foreach
和foreachPartition
,二者之间存在的差异跟之前介绍map
和mapPartition
相同,所以你如果有些写Database的操作,那么还是建议是foreachPartition
。下面会介绍在 DataFrame 和 Rdd 上面的操作示例,这两个 API 基本上使用方法都差不多,区别在之前也描述过了。Rdd和DataFrame操作其实也是一样的。
DataFrame 使用 foreachPartition()
语法
foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit
示例
In this example, to make it simple we just print the DataFrame to console.
// foreachPartition DataFrame
val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
df.foreachPartition(partition => {
// 如果你的数据需要写database,在这里执行初始化
partition.foreach(fun=>{
// 处理完的数据插入到 database
})
// 此处你也可以执行 batch操作,那么就不需要在上面的foreach 里执行操作了
})
DataFrame 使用 foreach()
val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
df.foreach(f=> {
longAcc.add(f.getInt(1))
})
println("Accumulator value:"+longAcc.value)
RDD 使用 foreachPartition()
语法
foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit
示例
// foreachPartition DataFrame
val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
rdd.foreachPartition(partition => {
//Initialize any database connection
partition.foreach(fun=>{
//apply the function
})
})
RDD 使用 foreach()
//rdd accumulator
val rdd2 = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
val longAcc2 = spark.sparkContext.longAccumulator("SumAccumulator2")
rdd .foreach(f=> {
longAcc2.add(f)
})
println("Accumulator value:"+longAcc2.value)
正文完
请博主喝杯咖啡吧!