apache-spark 介绍

原型

aggregate(zeroValue, seqOp, combOp)

说明

aggregate() 允许你获取 RDD 并生成与原始 RDD 中存储的类型不同的单个值。

参数

  1. zeroValue:结果的初始化值,采用所需格式。
  2. seqOp:你要应用于 RDD 记录的操作。对分区中的每个记录运行一次。
  3. combOp:定义结果对象(每个分区一个)的组合方式。

示例

计算列表的总和和该列表的长度。将结果返回到一对 (sum, length) 中。

在 Spark shell 中,创建一个包含 4 个元素的列表,其中包含 2 个分区

listRDD = sc.parallelize([1,2,3,4], 2)

然后定义 seqOp

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

然后定义 combOp

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

然后汇总:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

第一个分区有子列表[1,2]。这将 seqOp 应用于该列表的每个元素,从而产生本地结果 - 一对 (sum, length) 将在本地反映结果,仅在第一个分区中。

local_result 初始化为 zeroValue 参数 aggregate() 提供。例如,(0,0)和 list_element 是列表的第一个元素:

0 + 1 = 1
0 + 1 = 1

局部结果是(1,1),这意味着在处理第一个元素之后,和为 1,第一个分区的长度为 1。local_result 从(0,0)更新为(1,1)。

1 + 2 = 3
1 + 1 = 2

本地结果现在是(3,2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。对第二个分区执行相同操作返回(7,2)。

将 combOp 应用于每个本地结果以形成最终的全局结果:

(3,2) + (7,2) = (10, 4)

‘figure’中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)