共计 3576 个字符,预计需要花费 9 分钟才能阅读完成。
最近手受伤了,加上有不少事情焦头烂额有一段时间没有更新,但是想着还是把spark sql的基础给写完,算是有始有终。
简介
Spark SQL UDF (a.k.a User Defined Function) 顾明思议就是自定义的函数在spark中运行,那么这些都是用户根据业务实际的需求来操作,在spark中也提供了一些基本的函数处理方法,比如sum之类的,但是很多时候满足不了实际的需求,所以衍生出自定义函数。但是始终存在的一个问题就是自定义函数实现的高效性,毕竟spark自己集成的都是经过测试的,自己写的不一定优化的那么好,所以这块也是值得去注意的地方。
以前还是sql boy 的时候,也会用到udf,将udf用到sql语句当中去。所以提到 spark udf 其实没有那么的陌生。
为什么需要 UDF?
UDF 的诞生还是来自于大家实际的需求,集成的API满足不了大众的需求,开发出UDF 每一个开发者都可以自己去定义。
import spark.implicits._
val columns = Seq("Seqno","Quote")
val data = Seq(("1", "Be the change that you wish to see in the world"),
("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
("3", "The purpose of our lives is to be happy.")
)
val df = data.toDF(columns:_*)
df.show(false)
+-----+-----------------------------------------------------------------------------+
|Seqno|Quote |
+-----+-----------------------------------------------------------------------------+
|1 |Be the change that you wish to see in the world |
|2 |Everyone thinks of changing the world, but no one thinks of changing himself.|
|3 |The purpose of our lives is to be happy. |
+-----+-----------------------------------------------------------------------------+
udf 函数
开始定义一个udf函数,这个跟正常的函数定义是差不多,下面实现的函数方法是将第一个字符串变为大写,然后跟后面的字符串拼接在一起。
val convertCase = (strQuote:String) => {
val arr = strQuote.split(" ")
arr.map(f=> f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
}
现在定义好函数之后还不能直接用在的 DataFrame上,想要正常的使用需要经过一个注册的过程。
注册
首先你得引入这个包, org.apache.spark.sql.functions.udf
package.
val convertUDF = udf(convertCase)
经过上述步骤的注册就可以正常使用了。
//Using with DataFrame
df.select(col("Seqno"),
convertUDF(col("Quote")).as("Quote") ).show(false)
+-----+-----------------------------------------------------------------------------+
|Seqno|Quote |
+-----+-----------------------------------------------------------------------------+
|1 |Be The Change That You Wish To See In The World |
|2 |Everyone Thinks Of Changing The World, But No One Thinks Of Changing Himself.|
|3 |The Purpose Of Our Lives Is To Be Happy. |
+-----+-----------------------------------------------------------------------------+
注册并且在SQL语句中调用
对于dataframe的取数其实有两种方法,第一就是上面介绍的方法,使用select等内置函数方法,还有一种就是像写sql语句那样,但是这次的注册方式又不太一样了。spark.udf.register()
用这个来注册.
// Using it on SQL
spark.udf.register("convertUDF", convertCase)
df.createOrReplaceTempView("QUOTE_TABLE")
spark.sql("select Seqno, convertUDF(Quote) from QUOTE_TABLE").show(false)
null 检查
UDF’s 异常检查机制没有做的那么好,所以如果列里面出现null还是需要自己的cover,做下处理。否则就会报类似下面的错误,当然下面的举例是跟之前的udf挂钩。
Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function(anonfun1: (string) => string)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:152)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:92)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelationanonfunapply24anonfunapplyOrElse23.apply(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelationanonfunapply24anonfunapplyOrElse23.apply(Optimizer.scala:1364)
at scala.collection.TraversableLikeanonfunmap1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLikeanonfunmap1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLikeclass.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$anonfunapply24.applyOrElse(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelationanonfunapply$24.applyOrElse(Optimizer.scala:1359)
总结
当你在集成的function里找不到你想要的时候,此时此刻你需要UDF。