异步和同步

根据 ?@async 下的文档,“@async 在一个 Task 中包含一个表达式。” 这意味着,对于任何属于其范围的内容,Julia 将启动此任务运行,但随后继续执行脚本中的下一步,而无需等待任务完成。因此,例如,如果没有宏,你将得到:

julia> @time sleep(2)
  2.005766 seconds (13 allocations: 624 bytes)

但是使用宏,你得到:

julia> @time @async sleep(2)
  0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0

julia> 

Julia 因此允许脚本继续(并且 @time 宏完全执行)而不等待任务(在这种情况下,睡眠两秒钟)完成。

相比之下,@sync 宏将“等到 @async@spawn@spawnat@parallel 的所有动态封闭使用完成。” (根据 ?@sync 的文件)。因此,我们看到:

julia> @time @sync @async sleep(2)
  2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00

在这个简单的例子中,没有必要将 @async@sync 的单个实例包括在一起。但是,@sync 可能有用的地方是你将 @async 应用于多个操作,你希望允许所有操作立即启动而无需等待每个操作完成。

例如,假设我们有多个工作人员,我们想让他们每个人同时处理任务,然后从这些任务中获取结果。初始(但不正确)尝试可能是:

addprocs(2)
@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 4.011576 seconds (177 allocations: 9.734 KB)

这里的问题是循环等待每个 remotecall_fetch() 操作完成,即每个进程完成其工作(在这种情况下,睡眠 2 秒),然后继续开始下一个 remotecall_fetch() 操作。就实际情况而言,我们并没有从这里获得并行性的好处,因为我们的流程并没有同时完成他们的工作(即休眠)。

但是,我们可以通过使用 @async@sync 宏的组合来纠正这个问题:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 2.009416 seconds (274 allocations: 25.592 KB)

现在,如果我们将循环的每一步都计算为一个单独的操作,我们会看到在 @async 宏之前有两个单独的操作。宏允许每个启动,并且代码在每个完成之前继续(在这种情况下循环的下一步)。但是,使用 @sync 宏,其范围包含整个循环,意味着我们将不允许脚本继续经过该循环,直到 @async 之前的所有操作都已完成。

通过进一步调整上述示例,可以更清楚地了解这些宏的操作,以了解它在某些修改下如何发生变化。例如,假设我们只有 @async 没有 @sync

@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 0.001429 seconds (27 allocations: 2.234 KB)

在这里,@async 宏允许我们在每个 remotecall_fetch() 操作完成执行之前继续循环。但是,无论好坏,我们都没有 @sync 宏来阻止代码继续通过这个循环,直到所有的 remotecall_fetch() 操作完成。

尽管如此,即使我们继续运行,每个 remotecall_fetch() 操作仍然会并行运行。我们可以看到,因为如果我们等待两秒钟,那么包含结果的数组 a 将包含:

sleep(2)
julia> a
2-element Array{Any,1}:
 nothing
 nothing

nothing 元素是成功获取 sleep 函数结果的结果,该函数不返回任何值)

我们还可以看到两个 remotecall_fetch() 操作基本上同时开始,因为在它们之前的 print 命令也快速连续执行(这里没有显示这些命令的输出)。将此与下一个示例进行对比,其中 print 命令以彼此相差 2 秒的速度执行:

如果我们将 @async 宏放在整个循环上(而不仅仅是它的内部步骤),那么我们的脚本将立即继续,而不必等待 remotecall_fetch() 操作完成。但是,现在我们只允许脚本作为一个整体继续经过循环。我们不允许循环的每个单独步骤在前一个步骤完成之前启动。因此,与上面的示例不同,脚本在循环之后继续两秒后,results 阵列仍然具有一个元素 #undef,表示第二个 remotecall_fetch() 操作仍然没有完成。

@time begin
    a = cell(nworkers())
    @async for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to

sleep(2)

a
2-element Array{Any,1}:
    nothing
 #undef    

并且,毫不奇怪,如果我们将 @sync@async 放在彼此旁边,我们会得到每个 remotecall_fetch() 顺序运行(而不是同时运行),但我们不会继续执行代码,直到每个完成。换句话说,如果我们既没有宏,也就是说 sleep(2) 的行为与 @sync @async sleep(2) 基本相同,那么这基本上相当于

@time begin
    a = cell(nworkers())
    @sync @async for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10

另请注意,可以在 @async 宏的范围内进行更复杂的操作。该文档提供了一个示例,其中包含 @async 范围内的整个循环。

回想一下,同步宏的帮助声明它将“等到 @async@spawn@spawnat@parallel 的所有动态封闭使用完成后”。出于完整的目的,重要的是如何在 @sync@async 宏的范围内定义任务。考虑下面的例子,这是上面给出的一个例子的略微变化:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall(pid, sleep, 2)
    end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)

julia> a
2-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,3)
 RemoteRef{Channel{Any}}(3,1,4)

前面的示例大约需要 2 秒钟才能执行,表明这两个任务是并行运行的,并且脚本在继续之前等待每个任务完成其功能的执行。然而,这个例子的评估时间要低得多。原因是,为了 @sync 的目的,remotecall() 操作已经完成,一旦它发送了工作人员的工作。 (注意,结果数组 a,这里只包含 RemoteRef 对象类型,它们只表示某个特定进程正在发生某些事情,理论上可以在将来的某个时刻获取)。相比之下,remotecall_fetch() 操作只有在从工作人员获得其任务完成的消息时才完成

因此,如果你正在寻找方法,以确保工人某些操作已经在脚本中移动上(如例如在讨论之前完成这篇文章 ),有必要仔细考虑为完整什么罪名,你怎么会测量然后在脚本中操作它。