共计 3141 个字符,预计需要花费 8 分钟才能阅读完成。
Spark 采样就是从大量的数据中获取少量的数据,获取的方法可以依据某种策略,得到的数据用于分析,企图使用少量数据的分析结果代替全局。这也算是采样的一个重要点,当然对于做模型训练的也有类似的操作,比如当负样本过于稀疏的时候也会考虑使用负样本采样的方案来减少数据,这样模型的学习也会按照正常的迭代更新优化。
无论是 RDD还是DataFrame,都有相应的采样API。
1. Spark DataFrame 采样
Spark DataFrame sample()
这个方法重载了好几种,根据你传参的内容可以使用不同效果的函数。
1.1 语法
sample(fraction : scala.Double, seed : scala.Long) : Dataset[T]
sample(fraction : scala.Double) : Dataset[T]
sample(withReplacement : scala.Boolean, fraction : scala.Double, seed : scala.Long) : Dataset[T]
sample(withReplacement : scala.Boolean, fraction : scala.Double) : Dataset[T]
参数
fraction
– 采样的比例系数, 范围是 [0.0, 1.0].
seed
– 采样的随机种子,这个也是随机采样,就像你生成随机数据一样,也是需要随机种子
withReplacement
– 采样完之后是否直接覆盖原来的数据,一般情况下是false,不做替换,也就是返回一个新的数据,原来的数据不变。
1.2 DataFrame 采样的例子
val spark = SparkSession.builder() .master("local[1]").appName("SparkByExample")
.getOrCreate();
val df=spark.range(100)
println(df.sample(0.1).collect().mkString(","))
//Output: 7,8,27,36,40,48,51,52,63,76,85,88
上面的例子是从100个数里抽取·10 个数据,但是返回了12个数据,这个也是采样的时候要注意的地方,他并不是完全返回准确的数据量。
下面的例子是传入不同的随机种子观察采样的结果。
println(df.sample(0.1,123).collect().mkString(","))
//Output: 36,37,41,43,56,66,69,75,83
println(df.sample(0.1,123).collect().mkString(","))
//Output: 36,37,41,43,56,66,69,75,83
println(df.sample(0.1,456).collect().mkString(","))
//Output: 19,21,42,48,49,50,75,80
设置withReplacement
为 true,这就会直接修改原来的数据
println(df.sample(true,0.3,123).collect().mkString(",")) //with Duplicates
//Output: 0,5,9,11,14,14,16,17,21,29,33,41,42,52,52,54,58,65,65,71,76,79,85,96
println(df.sample(0.3,123).collect().mkString(",")) // No duplicates
//Output: 0,4,17,19,24,25,26,36,37,41,43,44,53,56,66,68,69,70,71,75,76,78,83,84,88,94,96,97,98
2. Spark 分层采样
主要使用的API是 sampleBy()
2.1 sampleBy() 语法
sampleBy[T](col : _root_.scala.Predef.String, fractions : _root_.scala.Predef.Map[T, scala.Double], seed : scala.Long) : DataFrame
sampleBy[T](col : _root_.scala.Predef.String, fractions : java.util.Map[T, java.lang.Double], seed : scala.Long) : DataFrame
sampleBy[T](col : org.apache.spark.sql.Column, fractions : _root_.scala.Predef.Map[T, scala.Double], seed : scala.Long) : DataFrame
sampleBy[T](col : org.apache.spark.sql.Column, fractions : java.util.Map[T, java.lang.Double], seed : scala.Long) : DataFrame
2.2 sampleBy() 例子
println(df.stat.sampleBy("id", Map(""->0.1),123).collect().mkString(","))
//Output: 6,13,17,19,78
3. Spark RDD 采样
Spark RDD also provides sample()
函数,同时有提供了另外一个API takeSample()
,但是这个API 是一个Action,返回一个数组,类似 collect操作,如果你返回的数据量过于庞大,那么也会对driver而言出现OOM的情况
3.1 RDD sample() 语法
基本上可以参考之前的操作,都是差不多的,直接看下面的示例。
3.2 RDD sample() 例子
val rdd = spark.sparkContext.range(0,100)
println(rdd.sample(false,0.1,0).collect().mkString(","))
//Output: 1,20,29,42,53,62,63,70,82,87
println(rdd.sample(true,0.3,123).collect().mkString(","))
//Output: 1,4,21,30,32,32,32,33,42,45,46,52,53,54,55,58,58,66,66,68,70,70,74,86,92,96,98,99
3.3 RDD takeSample() 语法
Below is a syntax of RDD takeSample()
signature.
takeSample(withReplacement : scala.Boolean, num : scala.Int, seed : scala.Long) : scala.Array[T]
3.4 RDD takeSample() 例子
println(rdd.takeSample(false,10,0).mkString(","))
//Output: 62,30,27,29,21,16,86,7,20,91
println(rdd.takeSample(true,30,123).mkString(","))
//Output: 85,49,61,16,90,5,33,98,89,38,89,29,5,48,24,60,41,33,13,40,14,33,56,95,40,48,61,36,82,9
总结
无论是 DataFrame还是rdd,他们的采样api sample基本上都是一样的,对于rdd有个额外的action api takesample
,注意在使用的时候数据量不要过大。