apache-spark 介紹
原型 :
aggregate(zeroValue, seqOp, combOp)
說明 :
aggregate()
允許你獲取 RDD 並生成與原始 RDD 中儲存的型別不同的單個值。
引數 :
zeroValue
:結果的初始化值,採用所需格式。seqOp
:你要應用於 RDD 記錄的操作。對分割槽中的每個記錄執行一次。combOp
:定義結果物件(每個分割槽一個)的組合方式。
示例 :
計算列表的總和和該列表的長度。將結果返回到一對
(sum, length)
中。
在 Spark shell 中,建立一個包含 4 個元素的列表,其中包含 2 個分割槽 :
listRDD = sc.parallelize([1,2,3,4], 2)
然後定義 seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
然後定義 combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
然後彙總:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
第一個分割槽有子列表[1,2]。這將 seqOp 應用於該列表的每個元素,從而產生本地結果 - 一對 (sum, length)
將在本地反映結果,僅在第一個分割槽中。
local_result
初始化為 zeroValue
引數 aggregate()
提供。例如,(0,0)和 list_element
是列表的第一個元素:
0 + 1 = 1
0 + 1 = 1
區域性結果是(1,1),這意味著在僅處理第一個元素之後,和為 1,第一個分割槽的長度為 1。local_result
從(0,0)更新為(1,1)。
1 + 2 = 3
1 + 1 = 2
本地結果現在是(3,2),這將是第一個分割槽的最終結果,因為它們不是第一個分割槽的子列表中的其他元素。對第二個分割槽執行相同操作返回(7,2)。
將 combOp 應用於每個本地結果以形成最終的全域性結果:
(3,2) + (7,2) = (10, 4)
‘figure’中描述的示例:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)