转型与行动
Spark 使用懒惰评估 ; 这意味着它不会做任何工作,除非它真的必须这样做。这种方法允许我们避免不必要的内存使用,从而使我们能够处理大数据。
一个转变是懒惰的评价,并在实际工作情况,当一个动作发生。
例:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
因此,在 [1]
中,我们告诉 Spark 将文件读入 RDD,名为 lines
。星火听到我们,对我们说:“是的,我会做到这一点”,但实际上它并没有尚未读取文件。
在[2]中,我们过滤文件的行,假设其内容包含错误的行,这些行在其开头标有 error
。因此,我们告诉 Spark 创建一个名为 errors
的新 RDD,它将具有 RDD lines
的元素,在其开始时有 error
这个词。
现在,在 [3]
,我们要求火花计数的错误,即算 RDD 称为元素的数量 errors
了。count()
是一个动作,它不会选择 Spark,而是实际进行操作,这样就可以找到 count()
的结果,这将是一个整数。
结果,当到达 [3]
时,实际上将执行 [1]
和 [2]
,即当我们到达 [3]
时,然后只有这样:
-
该文件将在
textFile()
中读取(因为[1]
) -
lines
将是filter()
‘ed(因为[2]
) -
count()
将执行,因为[3]
调试提示:由于 Spark 在到达 [3]
之前不会做任何实际的工作,所以重要的是要理解如果 [1]
和/或 [2]
中存在错误,它将不会出现,直到 [3]
中的动作触发 Spark 实际执行工作。例如,如果文件中的数据不支持我使用的 startsWith()
,那么 [2]
将被 Spark 正确接受并且不会引发任何错误,但是当提交 [3]
时,Spark 实际上会评估 [1]
和 [2]
,那么只有这样才能理解 [2]
的某些东西是不正确的并产生描述性的错误。
因此,执行 [3]
时可能会触发错误,但这并不意味着错误必须位于 [3]
的语句中!
注意,在 [3]
之后,lines
和 errors
都不会存储在内存中。它们将继续仅作为一组处理指令存在。如果对这些 RDD 中的任何一个执行多个操作,spark 将多次读取并过滤数据。为避免在单个 RDD 上执行多个操作时重复操作,使用 cache
将数据存储到内存中通常很有用。
你可以在 Spark 文档中看到更多转换/操作。