apache-spark 介紹

原型

aggregate(zeroValue, seqOp, combOp)

說明

aggregate() 允許你獲取 RDD 並生成與原始 RDD 中儲存的型別不同的單個值。

引數

  1. zeroValue:結果的初始化值,採用所需格式。
  2. seqOp:你要應用於 RDD 記錄的操作。對分割槽中的每個記錄執行一次。
  3. combOp:定義結果物件(每個分割槽一個)的組合方式。

示例

計算列表的總和和該列表的長度。將結果返回到一對 (sum, length) 中。

在 Spark shell 中,建立一個包含 4 個元素的列表,其中包含 2 個分割槽

listRDD = sc.parallelize([1,2,3,4], 2)

然後定義 seqOp

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

然後定義 combOp

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

然後彙總:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

第一個分割槽有子列表[1,2]。這將 seqOp 應用於該列表的每個元素,從而產生本地結果 - 一對 (sum, length) 將在本地反映結果,僅在第一個分割槽中。

local_result 初始化為 zeroValue 引數 aggregate() 提供。例如,(0,0)和 list_element 是列表的第一個元素:

0 + 1 = 1
0 + 1 = 1

區域性結果是(1,1),這意味著在處理第一個元素之後,和為 1,第一個分割槽的長度為 1。local_result 從(0,0)更新為(1,1)。

1 + 2 = 3
1 + 1 = 2

本地結果現在是(3,2),這將是第一個分割槽的最終結果,因為它們不是第一個分割槽的子列表中的其他元素。對第二個分割槽執行相同操作返回(7,2)。

將 combOp 應用於每個本地結果以形成最終的全域性結果:

(3,2) + (7,2) = (10, 4)

‘figure’中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)