在 Scala 中创建 DataFrame
有许多方法可以创建 DataFrame。它们可以从本地列表,分布式 RDD 或从数据源读取创建。
使用 toDF
通过导入 spark sql implicits,可以从本地 Seq,Array 或 RDD 创建 DataFrame,只要内容属于 Product 子类型(元组和 case 类是 Product 子类型的众所周知的示例)。例如:
import sqlContext.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
使用 createDataFrame
另一种选择是使用 SQLcontext 中的 createDataFrame
方法。此选项还允许使用 toDF
创建 Product 子类型的本地列表或 RDD,但不会在同一步骤中设置列的名称。例如:
val df1 = sqlContext.createDataFrame(Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
此外,这种方法允许从 Row
实例的 RDD 创建,只要传递 schema
参数来定义生成的 DataFrame 模式。例:
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
从消息来源阅读
也许最常见的创建 DataFrame 的方法来自数据源。可以从 hdfs 中的镶木地板文件创建它,例如:
val df = sqlContext.read.parquet("hdfs:/path/to/file")