轉型與行動
Spark 使用懶惰評估 ; 這意味著它不會做任何工作,除非它真的必須這樣做。這種方法允許我們避免不必要的記憶體使用,從而使我們能夠處理大資料。
一個轉變是懶惰的評價,並在實際工作情況,當一個動作發生。
例:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
因此,在 [1] 中,我們告訴 Spark 將檔案讀入 RDD,名為 lines。星火聽到我們,對我們說:“是的,我會做到這一點”,但實際上它並沒有尚未讀取檔案。
在[2]中,我們過濾檔案的行,假設其內容包含錯誤的行,這些行在其開頭標有 error。因此,我們告訴 Spark 建立一個名為 errors 的新 RDD,它將具有 RDD lines 的元素,在其開始時有 error 這個詞。
現在,在 [3],我們要求火花計數的錯誤,即算 RDD 稱為元素的數量 errors 了。count() 是一個動作,它不會選擇 Spark,而是實際進行操作,這樣就可以找到 count() 的結果,這將是一個整數。
結果,當到達 [3] 時,實際上將執行 [1] 和 [2],即當我們到達 [3] 時,然後只有這樣:
-
該檔案將在
textFile()中讀取(因為[1]) -
lines將是filter()‘ed(因為[2]) -
count()將執行,因為[3]
除錯提示:由於 Spark 在到達 [3] 之前不會做任何實際的工作,所以重要的是要理解如果 [1] 和/或 [2] 中存在錯誤,它將不會出現,直到 [3] 中的動作觸發 Spark 實際執行工作。例如,如果檔案中的資料不支援我使用的 startsWith(),那麼 [2] 將被 Spark 正確接受並且不會引發任何錯誤,但是當提交 [3] 時,Spark 實際上會評估 [1] 和 [2],那麼只有這樣才能理解 [2] 的某些東西是不正確的併產生描述性的錯誤。
因此,執行 [3] 時可能會觸發錯誤,但這並不意味著錯誤必須位於 [3] 的語句中!
注意,在 [3] 之後,lines 和 errors 都不會儲存在記憶體中。它們將繼續僅作為一組處理指令存在。如果對這些 RDD 中的任何一個執行多個操作,spark 將多次讀取並過濾資料。為避免在單個 RDD 上執行多個操作時重複操作,使用 cache 將資料儲存到記憶體中通常很有用。
你可以在 Spark 文件中看到更多轉換/操作。