共计 5989 个字符,预计需要花费 15 分钟才能阅读完成。
Spark withColumn()是一个DataFrame函数,用于向DataFrame中添加新列,更改现有列的值,转换列的数据类型,从现有列派生新列。
Spark withColumn()语法和用法
- 向DataFrame添加新列
- 更改现有列的值
- 从现有列派生新列
- 更改列数据类型
- 添加,替换或更新多列
- 重命名列名
- 从DataFrame删除列
- 将列拆分为多列
SparkwithColumn()是DataFrame的转换函数,用于处理DataFrame上所有行或选定行的列值。
在执行诸如添加新列,更新现有列的值,从现有列派生新列等操作之后,withColumn() 函数将返回一个新的 DataFrame。
下面是withColumn()函数的语法。
withColumn(colName : String, col : Column) : DataFrame
colName:Stirng–指定要创建的新列。使用现有的列来更新值。
col:Column –列表达式。
由于withColumn() 是一个转换函数,它只有在调用action时才执行。
Spark withColumn() 方法在内部引入了一个投影。
Spark文档
首先,让我们创建一个要使用的DataFrame。
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
val data = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
Row(Row("Michael","Rose",""),"40288","M","4000"),
Row(Row("Robert","","Williams"),"42114","M","4000"),
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
1.向DataFrame添加一个新列
要创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。在下面的代码片段中,lit()函数用于向DataFrame列添加常量值。我们还可以链接以添加多个列。
import org.apache.spark.sql.functions.lit
df.withColumn("Country", lit("USA"))
//chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
.withColumn("anotherColumn",lit("anotherValue"))
如果要处理几个列,则上述方法很好,但是当您要添加或更新多个列时,请勿使用withColumn()链接,因为这会导致性能问题,而应使用select()来更新多个列。
2.更改现有列的值
withColumn()DataFrame的Spark函数也可以用于更新现有列的值。为了更改该值,请将现有的列名作为第一个参数传递,将值分配为第二个列。请注意,第二个参数应为Columntype。
import org.apache.spark.sql.functions.col
df.withColumn("salary",col("salary")*100)
此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。
3.从现有列派生新列
要创建新列,请使用您希望新列使用的名称指定第一个参数,并通过对现有列进行操作来使用第二个参数来分配值。
df.withColumn("CopiedColumn",col("salary")* -1)
此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。
4.更改列数据类型
通过在DataFrame上使用Spark withColumn并在列上使用强制转换功能,我们可以更改DataFrame列的数据类型。下面的语句将“工资”列的数据类型从字符串更改为整数。
df.withColumn("salary",col("salary").cast("Integer"))
5.添加,替换或更新多个列
当您想在Spark DataFrame中添加,替换或更新多列时,建议不要链接withColumn()函数,因为这会导致性能问题,并建议在DataFrame上创建临时视图后使用select()
df2.createOrReplaceTempView("PERSON")
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
6.重命名列名
尽管6,7和8中的示例未使用withColumn()函数,但我仍然想解释如何重命名,删除和拆分列,因为这些对您很有用。
要重命名现有列,请在DataFrame上使用“ withColumnRenamed ”功能。
df.withColumnRenamed("gender","sex")
7.放置一列
使用drop()函数从DataFrame中删除特定的列。
df.drop("CopiedColumn")
8.将列拆分为多列
尽管此示例未使用withColumn()函数,但我仍然觉得用转换函数将一个DataFrame列拆分为多个列还是很好的解释map()。
import spark.implicits._
val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()
val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name",
"Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)
此代码段将“名称”列拆分为“名字”,“姓氏”,并将“地址”列拆分为“地址行1”,“城市”,“州”和“邮政编码”。产量低于产出:
root
|-- First Name: string (nullable = true)
|-- Last Name: string (nullable = true)
|-- Address Line1: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- zipCode: string (nullable = true)
+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert | Smith |1 Main st | Newark| NJ | 92537 |
|Maria | Garcia |3456 Walnut st| Newark| NJ | 94732 |
+----------+---------+--------------+-------+-----+-------+
注意:请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。
Spark withColumn完整示例
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.functions._
object WithColumn {
def main(args:Array[String]):Unit= {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val dataRows = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
Row(Row("Michael","Rose",""),"40288","M","4000"),
Row(Row("Robert","","Williams"),"42114","M","4000"),
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataRows),schema)
//Change the column data type
df2.withColumn("salary",df2("salary").cast("Integer"))
//Derive a new column from existing
val df4=df2.withColumn("CopiedColumn",df2("salary")* -1)
//Transforming existing column
val df5 = df2.withColumn("salary",df2("salary")*100)
//You can also chain withColumn to change multiple columns
//Renaming a column.
val df3=df2.withColumnRenamed("gender","sex")
df3.printSchema()
//Droping a column
val df6=df4.drop("CopiedColumn")
println(df6.columns.contains("CopiedColumn"))
//Adding a literal value
df2.withColumn("Country", lit("USA")).printSchema()
//Retrieving
df2.show(false)
df2.select("name").show(false)
df2.select("name.firstname").show(false)
df2.select("name.*").show(false)
import spark.implicits._
val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"), ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()
val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name","Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)
df2.createOrReplaceTempView("PERSON")
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
}
}
拆分列哪里要是数据量很大的话,通过这样索引是不是难以实现
@ManchiYY 如果拆分列很大,那么我觉得在一开始设计表的时候就应该考虑这个问题
val result = indata.select(“city_name”,explode(split(“city_rst_info”,”/t”))as “info”)这个方法可能会简单一点
@ManchiYY 使用explode方法比较优雅,文章中例子使用get方式获取是想通过map的方式来表达,你提到的这个方法也是很赞的
@admin 最近遇到一个问题2022.2.22这种以点分隔的时间字符串使用to_date()函数转换不成日期类型,不知道为咋办了
@ManchiYY to_date(col(“time”),”yyyy.M.dd”) 需要你构建一个pattern去识别
@admin 能请问一下你用的spark是那个版本的吗?我的to_date()函数只能接收一个参数
@ManchiYY 2.x
如果你对本站的文章有疑问可以邮件给我,点击该站点公告就可以发送邮件