將資料載入到 DataFrame 中
在 Spark(scala)
中,我們可以通過幾種不同的方式將資料匯入 DataFrame,每種方式用於不同的用例。
從 CSV 建立 DataFrame
將資料載入到 DataFrame 的最簡單方法是從 CSV 檔案載入資料。這方面的一個例子(摘自官方檔案 )是:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
隱式地從 RDD 建立 DataFrame
我們在 spark 應用程式中經常使用 RDD 中的資料,但需要將其轉換為 DataFrame。最簡單的方法是使用 .toDF()
RDD 函式,它將隱式確定 DataFrame 的資料型別:
val data = List(
("John", "Smith", 30),
("Jane", "Doe", 25)
)
val rdd = sc.parallelize(data)
val df = rdd.toDF("firstname", "surname", "age")
從 RDD 顯式建立 DataFrame
在某些情況下,使用 .toDF()
方法不是最好的主意,因為我們需要明確定義 DataFrame 的模式。這可以使用包含 StructField 陣列的 StructType 來實現。
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val data = List(
Array("John", "Smith", 30),
Array("Jane", "Doe", 25)
)
val rdd = sc.parallelize(data)
val schema = StructType(
Array(
StructField("firstname", StringType, true),
StructField("surname", StringType, false),
StructField("age", IntegerType, true)
)
)
val rowRDD = rdd.map(arr => Row(arr : _*))
val df = sqlContext.createDataFrame(rowRDD, schema)