异步和同步
根据 ?@async
下的文档,“@async
在一个 Task 中包含一个表达式。” 这意味着,对于任何属于其范围的内容,Julia 将启动此任务运行,但随后继续执行脚本中的下一步,而无需等待任务完成。因此,例如,如果没有宏,你将得到:
julia> @time sleep(2)
2.005766 seconds (13 allocations: 624 bytes)
但是使用宏,你得到:
julia> @time @async sleep(2)
0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0
julia>
Julia 因此允许脚本继续(并且 @time
宏完全执行)而不等待任务(在这种情况下,睡眠两秒钟)完成。
相比之下,@sync
宏将“等到 @async
,@spawn
,@spawnat
和 @parallel
的所有动态封闭使用完成。” (根据 ?@sync
的文件)。因此,我们看到:
julia> @time @sync @async sleep(2)
2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00
在这个简单的例子中,没有必要将 @async
和 @sync
的单个实例包括在一起。但是,@sync
可能有用的地方是你将 @async
应用于多个操作,你希望允许所有操作立即启动而无需等待每个操作完成。
例如,假设我们有多个工作人员,我们想让他们每个人同时处理任务,然后从这些任务中获取结果。初始(但不正确)尝试可能是:
addprocs(2)
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 4.011576 seconds (177 allocations: 9.734 KB)
这里的问题是循环等待每个 remotecall_fetch()
操作完成,即每个进程完成其工作(在这种情况下,睡眠 2 秒),然后继续开始下一个 remotecall_fetch()
操作。就实际情况而言,我们并没有从这里获得并行性的好处,因为我们的流程并没有同时完成他们的工作(即休眠)。
但是,我们可以通过使用 @async
和 @sync
宏的组合来纠正这个问题:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 2.009416 seconds (274 allocations: 25.592 KB)
现在,如果我们将循环的每一步都计算为一个单独的操作,我们会看到在 @async
宏之前有两个单独的操作。宏允许每个启动,并且代码在每个完成之前继续(在这种情况下循环的下一步)。但是,使用 @sync
宏,其范围包含整个循环,意味着我们将不允许脚本继续经过该循环,直到 @async
之前的所有操作都已完成。
通过进一步调整上述示例,可以更清楚地了解这些宏的操作,以了解它在某些修改下如何发生变化。例如,假设我们只有 @async
没有 @sync
:
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
println("sending work to $pid")
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 0.001429 seconds (27 allocations: 2.234 KB)
在这里,@async
宏允许我们在每个 remotecall_fetch()
操作完成执行之前继续循环。但是,无论好坏,我们都没有 @sync
宏来阻止代码继续通过这个循环,直到所有的 remotecall_fetch()
操作完成。
尽管如此,即使我们继续运行,每个 remotecall_fetch()
操作仍然会并行运行。我们可以看到,因为如果我们等待两秒钟,那么包含结果的数组 a 将包含:
sleep(2)
julia> a
2-element Array{Any,1}:
nothing
nothing
(nothing
元素是成功获取 sleep 函数结果的结果,该函数不返回任何值)
我们还可以看到两个 remotecall_fetch()
操作基本上同时开始,因为在它们之前的 print
命令也快速连续执行(这里没有显示这些命令的输出)。将此与下一个示例进行对比,其中 print
命令以彼此相差 2 秒的速度执行:
如果我们将 @async
宏放在整个循环上(而不仅仅是它的内部步骤),那么我们的脚本将立即继续,而不必等待 remotecall_fetch()
操作完成。但是,现在我们只允许脚本作为一个整体继续经过循环。我们不允许循环的每个单独步骤在前一个步骤完成之前启动。因此,与上面的示例不同,脚本在循环之后继续两秒后,results
阵列仍然具有一个元素 #undef
,表示第二个 remotecall_fetch()
操作仍然没有完成。
@time begin
a = cell(nworkers())
@async for (idx, pid) in enumerate(workers())
println("sending work to $pid")
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to
sleep(2)
a
2-element Array{Any,1}:
nothing
#undef
并且,毫不奇怪,如果我们将 @sync
和 @async
放在彼此旁边,我们会得到每个 remotecall_fetch()
顺序运行(而不是同时运行),但我们不会继续执行代码,直到每个完成。换句话说,如果我们既没有宏,也就是说 sleep(2)
的行为与 @sync @async sleep(2)
基本相同,那么这基本上相当于
@time begin
a = cell(nworkers())
@sync @async for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10
另请注意,可以在 @async
宏的范围内进行更复杂的操作。该文档提供了一个示例,其中包含 @async
范围内的整个循环。
回想一下,同步宏的帮助声明它将“等到 @async
,@spawn
,@spawnat
和 @parallel
的所有动态封闭使用完成后”。出于完整的目的,重要的是如何在 @sync
和 @async
宏的范围内定义任务。考虑下面的例子,这是上面给出的一个例子的略微变化:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall(pid, sleep, 2)
end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)
julia> a
2-element Array{Any,1}:
RemoteRef{Channel{Any}}(2,1,3)
RemoteRef{Channel{Any}}(3,1,4)
前面的示例大约需要 2 秒钟才能执行,表明这两个任务是并行运行的,并且脚本在继续之前等待每个任务完成其功能的执行。然而,这个例子的评估时间要低得多。原因是,为了 @sync
的目的,remotecall()
操作已经完成,一旦它发送了工作人员的工作。 (注意,结果数组 a,这里只包含 RemoteRef
对象类型,它们只表示某个特定进程正在发生某些事情,理论上可以在将来的某个时刻获取)。相比之下,remotecall_fetch()
操作只有在从工作人员获得其任务完成的消息时才完成。
因此,如果你正在寻找方法,以确保工人某些操作已经在脚本中移动上(如例如在讨论之前完成这篇文章 ),有必要仔细考虑为完整什么罪名,你怎么会测量然后在脚本中操作它。