将数据加载到 DataFrame 中

Spark(scala) 中,我们可以通过几种不同的方式将数据导入 DataFrame,每种方式用于不同的用例。

从 CSV 创建 DataFrame

将数据加载到 DataFrame 的最简单方法是从 CSV 文件加载数据。这方面的一个例子(摘自官方文件 )是:

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

隐式地从 RDD 创建 DataFrame

我们在 spark 应用程序中经常使用 RDD 中的数据,但需要将其转换为 DataFrame。最简单的方法是使用 .toDF() RDD 函数,它将隐式确定 DataFrame 的数据类型:

val data = List(
   ("John", "Smith", 30), 
   ("Jane", "Doe", 25)
)

val rdd = sc.parallelize(data)

val df = rdd.toDF("firstname", "surname", "age")

从 RDD 显式创建 DataFrame

在某些情况下,使用 .toDF() 方法不是最好的主意,因为我们需要明确定义 DataFrame 的模式。这可以使用包含 StructField 数组的 StructType 来实现。

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val data = List(
   Array("John", "Smith", 30), 
   Array("Jane", "Doe", 25)
)

val rdd = sc.parallelize(data)

val schema = StructType(
   Array(
      StructField("firstname", StringType,  true),
      StructField("surname",   StringType,  false),
      StructField("age",       IntegerType, true)
   )
)

val rowRDD = rdd.map(arr => Row(arr : _*))

val df = sqlContext.createDataFrame(rowRDD, schema)