在 Scala 中创建 DataFrame

有许多方法可以创建 DataFrame。它们可以从本地列表,分布式 RDD 或从数据源读取创建。

使用 toDF

通过导入 spark sql implicits,可以从本地 Seq,Array 或 RDD 创建 DataFrame,只要内容属于 Product 子类型(元组和 case 类是 Product 子类型的众所周知的示例)。例如:

import sqlContext.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")

使用 createDataFrame

另一种选择是使用 SQLcontext 中的 createDataFrame 方法。此选项还允许使用 toDF 创建 Product 子类型的本地列表或 RDD,但不会在同一步骤中设置列的名称。例如:

val df1 = sqlContext.createDataFrame(Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))

此外,这种方法允许从 Row 实例的 RDD 创建,只要传递 schema 参数来定义生成的 DataFrame 模式。例:

import org.apache.spark.sql.types._
val schema = StructType(List(
    StructField("integer_column", IntegerType, nullable = false),
    StructField("string_column", StringType, nullable = true),
    StructField("date_column", DateType, nullable = true)
))

val rdd = sc.parallelize(Seq(
  Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))

val df = sqlContext.createDataFrame(rdd, schema)

从消息来源阅读

也许最常见的创建 DataFrame 的方法来自数据源。可以从 hdfs 中的镶木地板文件创建它,例如:

val df = sqlContext.read.parquet("hdfs:/path/to/file")