共计 4977 个字符,预计需要花费 13 分钟才能阅读完成。
博主日常在工作中使用spark sql进行数据分析比较多,常见的还会借助hive。从Tfrecord里面读数据出来为DataSet,如果你会使用pandas 那么它跟 pandas 的形式很像,那么你操作起来会更贱容易上手。
在Spark中,使用 createDataFrame() 和 toDF() 方法创建一个 DataFrame,使用这些方法,您可以从已经存在的RDD,DataFrame,Dataset,List,Seq数据对象中创建一个Spark DataFrame,在这里我将用Scala示例进行说明。 。
您还可以从不同的来源(例如Text,CSV,JSON,XML,Parquet,Avro,ORC,二进制文件,RDBMS表,Hive,HBase等)创建一个DataFrame 。
DataFrame是组织为命名列的分布式数据集合。从概念上讲,它等效于关系数据库中的表或R / Python中的数据框,但是在后台进行了更丰富的优化。可以从多种来源构造DataFrame,例如:结构化数据文件,Hive中的表,外部数据库或现有的RDD。
val spark:SparkSession = SparkSession.builder() .master("local[1]").appName("SparkByExamples.com") .getOrCreate() import spark.implicits._ val columns = Seq("language","users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
-数据块
- Spark从RDD创建DataFrame
- 从列表和Seq集合创建DataFrame
- 从CSV文件创建Spark DataFrame
- 从TXT文件创建
- 从JSON文件创建
- 从XML文件创建
- 从HIVE创造
- 从RDBMS数据库创建表
- 从HBase表创建
- 其他来源(Avro,镶木地板等)
首先,让我们在示例中(例如,当需要使用时.toDF() function
)导入所需的spark隐式数据,并为示例创建数据。
1. Spark从RDD创建DataFrame
创建Spark DataFrame的一种简单方法是从现有的RDD中进行。首先,让我们通过调用parallelize()从集合Seq创建一个RDD。
我将在下面的所有示例中使用该rdd对象。
val rdd = spark.sparkContext.parallelize(data)
1.1使用toDF()函数
一旦有了RDD,就可以使用toDF()
它在Spark中创建DataFrame。默认情况下,它创建的列名称分别为“ _1”和“ _2”,因为每一行有两列。
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
由于RDD是无模式的,没有列名和数据类型,因此从RDD转换为DataFrame会为您提供默认的列名,如_1
,_2等,数据类型则为String。printSchema就是将DataFrame列和数据类型等打印出来。就是表格式。
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
toDF()
还有另一个签名来分配列名,该签名为列名采用了可变数量的参数,如下所示。
产量低于产出。记住这里我们只是分配了列名,但它仍然将所有数据类型都当作字符串。
默认情况下,这些列的数据类型分配给String。我们可以通过提供模式来更改此行为–我们可以在其中为每个字段/列指定列名,数据类型和可为null。
1.2从SparkSession使用Spark createDataFrame
createDataFrame()从SparkSession使用是另一种创建方法,它使用rdd对象作为参数。并与toDF()链接以为列指定名称。
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
1.3在行类型中使用createDataFrame()
createDataFrame()具有另一个签名,该签名采用RDD [Row]类型和列名称的架构作为参数。要首先使用它,我们需要将“ rdd”对象从转换为RDD[T],RDD[Row]并使用StructType和StructField定义模式。这种方式相当于定义好每条数据,但是也要提前定义好数据的格式也就是Schema。
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
val schema = StructType( Array(
StructField("language", StringType,true),
StructField("language", StringType,true)
))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)
2.从列表和序列集合创建Spark DataFrame
在本部分中,我们将看到几种通过collectionSeq[T]或创建Spark DataFrame的方法List[T]。这些示例与我们在上一节中使用RDD看到的示例相似,但是我们使用“数据”对象而不是“ rdd”对象。
2.1在List或Seq集合上使用toDF()
toDF()在集合(Seq,List)对象上创建一个DataFrame。确保导入import spark.implicits._以使用toDF()
val dfFromData1 = data.toDF()
2.2从SparkSession使用createDataFrame()
调用createDataFrame()fromSparkSession是另一种创建方法,它使用集合对象(Seq或List)作为参数。并与toDF()链接以为列指定名称。
//From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)
2.3在行类型中使用createDataFrame()
createDataFrame()在Spark中具有另一个签名,该签名将列类型的行类型和模式的集合作为参数。要首先使用它,我们需要将“数据”对象从Seq [T]转换为Seq [Row]。
//From Data (USING createDataFrame and Adding schema using StructType)
val data = Seq(Row("Java", "20000"),
Row("Python", "100000"),
Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)
以下 3-6都是从文件中读取相应的数据然后生成DataFrame
3.从CSV创建Spark DataFrame
在以上所有示例中,您已经学习了Spark通过RDD和数据收集对象创建DataFrame。实时这些功能较少使用。在本节和以下各节中,您将学习如何从数据源(如CSV,文本,JSON,Avro等)创建DataFrame。
默认情况下,Spark提供了一个API以读取定界符文件,例如逗号,管道,制表符分隔的文件,并且还提供了几种处理标头,不包含标头,双引号,数据类型等的选项。
有关详细示例,请参阅从CSV文件创建DataFrame。
val df2 = spark.read.csv("/src/resources/file.csv")
4.从文本创建(TXT)文件
在这里,将看到如何从TXT文件创建。
val df2 = spark.read
.text("/src/resources/file.txt")
5.从JSON文件创建
在这里,将看到如何从JSON文件创建。
val df2 = spark.read
.json("/src/resources/file.json")
6.从XML文件创建
要通过解析XML创建DataFrame,我们应该使用”com.databricks.spark.xml”Databricks的DataSource spark-xml API。
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "person")
.xml("src/main/resources/persons.xml")
7.从HDFS读取
val df =spark.read.format("tfrecords").load("hdfs://****/data/test_tf/all/2020-03-26/12")
7.从Hive创建
处理大数据大部分会从hive或者hdfs读取
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)
8. Spark从RDBMS数据库创建DataFrame
8.1)从Mysql表
确保您的pom.xml文件或类路径中的MySQL jar都具有MySQL库作为依赖项。
val df_mysql = spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://localhost:port/db”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
8.2从DB2表
确保在pom.xml文件或类路径中的DB2 jar中将DB2库作为依赖项。
val df_db2 = spark.read.format(“jdbc”)
.option(“url”, “jdbc:db2://localhost:50000/dbname”)
.option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
同样,我们可以从大多数关系数据库中在Spark中创建DataFrame,在这里我没有介绍,我将把它留给您研究。
9.从HBase表创建DataFrame
要从HBase表创建Spark DataFrame,我们应该使用Spark HBase连接器中定义的DataSource 。例如,使用org.apache.spark.sql.execution.datasources.hbaseHortonworks的DataSource“ ”或使用org.apache.hadoop.hbase.sparkspark HBase连接器的“ ”。
val hbaseDF = sparkSession.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
从HBase表生成DataFrame中解释的详细示例
10.其他来源(Avro,Parquet,Kafka)
我们还可以从Avro,Parquet,HBase创建DataFrame并从Kafka读取数据,这在下面的文章中已经进行了解释,我建议您在有空的时候阅读这些内容。