控制 Spark SQL Shuffle 分区

在 Apache Spark 中执行像 joincogroup 这样的随机操作时,很多数据都会通过网络传输。现在,控制发生 shuffle 的分区数可以通过 Spark SQL 中给出的配置来控制。该配置如下:

spark.sql.shuffle.partitions

使用此配置,我们可以控制 shuffle 操作的分区数。默认情况下,其值为 200。但是,如果我们有几 GB 的文件,200 个分区没有任何意义。因此,我们应该根据我们需要通过 Spark SQL 处理的数据量来更改它们。如下:

在这种情况下,我们有两个表加入 employeedepartment。两个表仅包含少量记录,但我们需要加入它们以了解每个员工的部门。所以,我们使用 Spark DataFrames 加入他们,如下所示:

val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)

val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")

employeeDF.join(departmentDF, "employeeName").show()

现在,默认情况下,在连接时创建的分区数量是 200,这对于这么多数据来说当然太多了。

因此,我们可以更改此值,以便减少 shuffle 操作的数量。

val conf = new SparkConf().setAppName("sample").setMaster("local").set("spark.sql.shuffle.partitions", 2)
val sc = new SparkContext(conf)

val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")

employeeDF.join(departmentDF, "employeeName").show()

现在,shuffle 分区的数量减少到只有 2,这不仅减少了混洗操作的数量,而且还减少了从 0.878505 s0.077847 s 加入 DataFrames 所需的时间。

因此,始终根据正在处理的数据配置随机操作的分区数。