转型与行动
Spark 使用懒惰评估 ; 这意味着它不会做任何工作,除非它真的必须这样做。这种方法允许我们避免不必要的内存使用,从而使我们能够处理大数据。
一个转变是懒惰的评价,并在实际工作情况,当一个动作发生。
例:
In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example
因此,在 [1] 中,我们告诉 Spark 将文件读入 RDD,名为 lines。星火听到我们,对我们说:“是的,我会做到这一点”,但实际上它并没有尚未读取文件。
在[2]中,我们过滤文件的行,假设其内容包含错误的行,这些行在其开头标有 error。因此,我们告诉 Spark 创建一个名为 errors 的新 RDD,它将具有 RDD lines 的元素,在其开始时有 error 这个词。
现在,在 [3],我们要求火花计数的错误,即算 RDD 称为元素的数量 errors 了。count() 是一个动作,它不会选择 Spark,而是实际进行操作,这样就可以找到 count() 的结果,这将是一个整数。
结果,当到达 [3] 时,实际上将执行 [1] 和 [2],即当我们到达 [3] 时,然后只有这样:
- 
该文件将在 textFile()中读取(因为[1])
- 
lines将是filter()‘ed(因为[2])
- 
count()将执行,因为[3]
调试提示:由于 Spark 在到达 [3] 之前不会做任何实际的工作,所以重要的是要理解如果 [1] 和/或 [2] 中存在错误,它将不会出现,直到 [3] 中的动作触发 Spark 实际执行工作。例如,如果文件中的数据不支持我使用的 startsWith(),那么 [2] 将被 Spark 正确接受并且不会引发任何错误,但是当提交 [3] 时,Spark 实际上会评估 [1] 和 [2],那么只有这样才能理解 [2] 的某些东西是不正确的并产生描述性的错误。
因此,执行 [3] 时可能会触发错误,但这并不意味着错误必须位于 [3] 的语句中!
注意,在 [3] 之后,lines 和 errors 都不会存储在内存中。它们将继续仅作为一组处理指令存在。如果对这些 RDD 中的任何一个执行多个操作,spark 将多次读取并过滤数据。为避免在单个 RDD 上执行多个操作时重复操作,使用 cache 将数据存储到内存中通常很有用。
你可以在 Spark 文档中看到更多转换/操作。