控制 Spark SQL Shuffle 分割槽

在 Apache Spark 中執行像 joincogroup 這樣的隨機操作時,很多資料都會通過網路傳輸。現在,控制發生 shuffle 的分割槽數可以通過 Spark SQL 中給出的配置來控制。該配置如下:

spark.sql.shuffle.partitions

使用此配置,我們可以控制 shuffle 操作的分割槽數。預設情況下,其值為 200。但是,如果我們有幾 GB 的檔案,200 個分割槽沒有任何意義。因此,我們應該根據我們需要通過 Spark SQL 處理的資料量來更改它們。如下:

在這種情況下,我們有兩個表加入 employeedepartment。兩個表僅包含少量記錄,但我們需要加入它們以瞭解每個員工的部門。所以,我們使用 Spark DataFrames 加入他們,如下所示:

val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)

val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")

employeeDF.join(departmentDF, "employeeName").show()

現在,預設情況下,在連線時建立的分割槽數量是 200,這對於這麼多資料來說當然太多了。

因此,我們可以更改此值,以便減少 shuffle 操作的數量。

val conf = new SparkConf().setAppName("sample").setMaster("local").set("spark.sql.shuffle.partitions", 2)
val sc = new SparkContext(conf)

val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")

employeeDF.join(departmentDF, "employeeName").show()

現在,shuffle 分割槽的數量減少到只有 2,這不僅減少了混洗操作的數量,而且還減少了從 0.878505 s0.077847 s 加入 DataFrames 所需的時間。

因此,始終根據正在處理的資料配置隨機操作的分割槽數。