轉型與行動
Spark 使用懶惰評估 ; 這意味著它不會做任何工作,除非它真的必須這樣做。這種方法允許我們避免不必要的記憶體使用,從而使我們能夠處理大資料。
一個轉變是懶惰的評價,並在實際工作情況,當一個動作發生。
例:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
因此,在 [1]
中,我們告訴 Spark 將檔案讀入 RDD,名為 lines
。星火聽到我們,對我們說:“是的,我會做到這一點”,但實際上它並沒有尚未讀取檔案。
在[2]中,我們過濾檔案的行,假設其內容包含錯誤的行,這些行在其開頭標有 error
。因此,我們告訴 Spark 建立一個名為 errors
的新 RDD,它將具有 RDD lines
的元素,在其開始時有 error
這個詞。
現在,在 [3]
,我們要求火花計數的錯誤,即算 RDD 稱為元素的數量 errors
了。count()
是一個動作,它不會選擇 Spark,而是實際進行操作,這樣就可以找到 count()
的結果,這將是一個整數。
結果,當到達 [3]
時,實際上將執行 [1]
和 [2]
,即當我們到達 [3]
時,然後只有這樣:
-
該檔案將在
textFile()
中讀取(因為[1]
) -
lines
將是filter()
‘ed(因為[2]
) -
count()
將執行,因為[3]
除錯提示:由於 Spark 在到達 [3]
之前不會做任何實際的工作,所以重要的是要理解如果 [1]
和/或 [2]
中存在錯誤,它將不會出現,直到 [3]
中的動作觸發 Spark 實際執行工作。例如,如果檔案中的資料不支援我使用的 startsWith()
,那麼 [2]
將被 Spark 正確接受並且不會引發任何錯誤,但是當提交 [3]
時,Spark 實際上會評估 [1]
和 [2]
,那麼只有這樣才能理解 [2]
的某些東西是不正確的併產生描述性的錯誤。
因此,執行 [3]
時可能會觸發錯誤,但這並不意味著錯誤必須位於 [3]
的語句中!
注意,在 [3]
之後,lines
和 errors
都不會儲存在記憶體中。它們將繼續僅作為一組處理指令存在。如果對這些 RDD 中的任何一個執行多個操作,spark 將多次讀取並過濾資料。為避免在單個 RDD 上執行多個操作時重複操作,使用 cache
將資料儲存到記憶體中通常很有用。
你可以在 Spark 文件中看到更多轉換/操作。