共计 3086 个字符,预计需要花费 8 分钟才能阅读完成。
在SparkwithColumnRenamed()中用于重命名一列或多个DataFrame列名。取决于DataFrame架构,重命名列可能变得简单到复杂,尤其是当列嵌套有struct类型时,它会变得复杂。
在本文中,我将说明如何使用多个用例来重命名DataFrame列,例如重命名所选的多个列,嵌套的struct列,所有带有Scala示例的列。
- 使用withColumnRenamed –重命名Spark DataFrame列名
- 与withColumnRenamed一起使用–重命名多个列
- 使用StructType –重命名Spark DataFrame上的嵌套列
- 使用选择–重命名嵌套的列
- 使用withColumn –重命名嵌套的列
- 使用col()函数–动态重命名所有或多个列
- 使用toDF()–重命名所有或多个列
首先,让我们为示例创建数据,我们将数据转换为Spark DataFrame时使用Row类。
val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
Row(Row("Michael ","Rose",""),"40288","M",4000),
Row(Row("Robert ","","Williams"),"42114","M",4000),
Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
Row(Row("Jen","Mary","Brown"),"","F",-1)
)
我们的基本架构具有嵌套结构。
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",IntegerType)
让我们通过使用parallelize创建DataFrame并提供上述架构。
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.printSchema()
以下是我们的架构结构。我不在这里打印数据,因为我们的示例没有必要。该模式具有嵌套结构。
1.使用Spark withColumnRenamed –重命名DataFrame列名
SparkwithColumnRenamed()在DataFrame上具有更改列名称的功能。这是最直接的方法;该函数有两个参数;第一个是您现有的列名,第二个是您想要的新列名。返回的是新的DataFrame。
句法:
def withColumnRenamed(existingName: String, newName: String): DataFrame
existingName –您要更改的现有列名
newName –列的新名称
返回具有重命名列的新DataFrame(Dataset [Row])。如果架构不包含,则此操作不可操作existingName。
范例:
df.withColumnRenamed("dob","DateOfBirth")
.printSchema()
上面的示例将spark DataFrame上的列名从“ dob”更改为“ DateOfBirth”。请注意,该withColumnRenamed函数返回一个新的DataFrame,并且不会修改当前的DataFrame。
2.使用withColumnRenamed –重命名多个列
要更改多个列名,我们应该withColumnRenamed按如下所示链接函数。
val df2 = df.withColumnRenamed("dob","DateOfBirth")
.withColumnRenamed("salary","salary_amount")
df2.printSchema()
重命名dob和salary列后,这将创建一个新的DataFrame“ df2”。
3.使用Spark StructType –重命名Dataframe中的嵌套列
更改嵌套数据上的列名并非一帆风顺,我们可以通过使用StructType使用新的DataFrame列创建一个新的架构,并使用如下所示的cast函数来使用它来实现。
val schema2 = new StructType()
.add("fname",StringType)
.add("middlename",StringType)
.add("lname",StringType)
df.select(col("name").cast(schema2),
col("dob"),
col("gender"),
col("salary"))
.printSchema()
该语句在名称结构中将firstname重命名为fname,将lastname重命名为lname。
重命名多列
4.使用选择–重命名嵌套元素。
让我们看看通过将结构转置为平面来更改嵌套列的另一种方法。
df.select(col("name.firstname").as("fname"),
col("name.middlename").as("mname"),
col("name.lastname").as("lname"),
col("dob"),col("gender"),col("salary"))
.printSchema()
Spark重命名嵌套列
5.使用Spark DataFrame withColumn –重命名嵌套的列
当您在Spark DatFrame上嵌套了列并且想要重命名它时,请使用withColumn数据框对象从现有对象创建新列,我们将需要删除现有列。下面的示例从“ name.firstname”创建一个“ fname”列,并删除“ name”列
val df4 = df.withColumn("fname",col("name.firstname"))
.withColumn("mname",col("name.middlename"))
.withColumn("lname",col("name.lastname"))
.drop("name")
df4.printSchema()
6.使用col()函数–动态重命名所有或多个列
更改数据框上所有列名称的另一种方法是使用col()函数。
val old_columns = Seq("dob","gender","salary","fname","mname","lname")
val new_columns = Seq("DateOfBirth","Sex","salary","firstName","middleName","lastName")
val columnsList = old_columns.zip(new_columns).map(f=>{col(f._1).as(f._2)})
val df5 = df4.select(columnsList:_*)
df5.printSchema()
7.使用toDF()–更改Spark DataFrame中的所有列
当我们的数据具有平面结构(无嵌套)时,请使用toDF()新的架构来更改所有列名。
val newColumns = Seq("newCol1","newCol2","newCol3")
val df = df.toDF(newColumns:_*)