apache-spark 介绍
原型 :
aggregate(zeroValue, seqOp, combOp)
说明 :
aggregate()
允许你获取 RDD 并生成与原始 RDD 中存储的类型不同的单个值。
参数 :
zeroValue
:结果的初始化值,采用所需格式。seqOp
:你要应用于 RDD 记录的操作。对分区中的每个记录运行一次。combOp
:定义结果对象(每个分区一个)的组合方式。
示例 :
计算列表的总和和该列表的长度。将结果返回到一对
(sum, length)
中。
在 Spark shell 中,创建一个包含 4 个元素的列表,其中包含 2 个分区 :
listRDD = sc.parallelize([1,2,3,4], 2)
然后定义 seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
然后定义 combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
然后汇总:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
第一个分区有子列表[1,2]。这将 seqOp 应用于该列表的每个元素,从而产生本地结果 - 一对 (sum, length)
将在本地反映结果,仅在第一个分区中。
local_result
初始化为 zeroValue
参数 aggregate()
提供。例如,(0,0)和 list_element
是列表的第一个元素:
0 + 1 = 1
0 + 1 = 1
局部结果是(1,1),这意味着在仅处理第一个元素之后,和为 1,第一个分区的长度为 1。local_result
从(0,0)更新为(1,1)。
1 + 2 = 3
1 + 1 = 2
本地结果现在是(3,2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。对第二个分区执行相同操作返回(7,2)。
将 combOp 应用于每个本地结果以形成最终的全局结果:
(3,2) + (7,2) = (10, 4)
‘figure’中描述的示例:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)