将数据加载到 DataFrame 中
在 Spark(scala)
中,我们可以通过几种不同的方式将数据导入 DataFrame,每种方式用于不同的用例。
从 CSV 创建 DataFrame
将数据加载到 DataFrame 的最简单方法是从 CSV 文件加载数据。这方面的一个例子(摘自官方文件 )是:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
隐式地从 RDD 创建 DataFrame
我们在 spark 应用程序中经常使用 RDD 中的数据,但需要将其转换为 DataFrame。最简单的方法是使用 .toDF()
RDD 函数,它将隐式确定 DataFrame 的数据类型:
val data = List(
("John", "Smith", 30),
("Jane", "Doe", 25)
)
val rdd = sc.parallelize(data)
val df = rdd.toDF("firstname", "surname", "age")
从 RDD 显式创建 DataFrame
在某些情况下,使用 .toDF()
方法不是最好的主意,因为我们需要明确定义 DataFrame 的模式。这可以使用包含 StructField 数组的 StructType 来实现。
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val data = List(
Array("John", "Smith", 30),
Array("Jane", "Doe", 25)
)
val rdd = sc.parallelize(data)
val schema = StructType(
Array(
StructField("firstname", StringType, true),
StructField("surname", StringType, false),
StructField("age", IntegerType, true)
)
)
val rowRDD = rdd.map(arr => Row(arr : _*))
val df = sqlContext.createDataFrame(rowRDD, schema)