非同步和同步

根據 ?@async 下的文件,“@async 在一個 Task 中包含一個表示式。” 這意味著,對於任何屬於其範圍的內容,Julia 將啟動此任務執行,但隨後繼續執行指令碼中的下一步,而無需等待任務完成。因此,例如,如果沒有巨集,你將得到:

julia> @time sleep(2)
  2.005766 seconds (13 allocations: 624 bytes)

但是使用巨集,你得到:

julia> @time @async sleep(2)
  0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0

julia> 

Julia 因此允許指令碼繼續(並且 @time 巨集完全執行)而不等待任務(在這種情況下,睡眠兩秒鐘)完成。

相比之下,@sync 巨集將“等到 @async@spawn@spawnat@parallel 的所有動態封閉使用完成。” (根據 ?@sync 的檔案)。因此,我們看到:

julia> @time @sync @async sleep(2)
  2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00

在這個簡單的例子中,沒有必要將 @async@sync 的單個例項包括在一起。但是,@sync 可能有用的地方是你將 @async 應用於多個操作,你希望允許所有操作立即啟動而無需等待每個操作完成。

例如,假設我們有多個工作人員,我們想讓他們每個人同時處理任務,然後從這些任務中獲取結果。初始(但不正確)嘗試可能是:

addprocs(2)
@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 4.011576 seconds (177 allocations: 9.734 KB)

這裡的問題是迴圈等待每個 remotecall_fetch() 操作完成,即每個程序完成其工作(在這種情況下,睡眠 2 秒),然後繼續開始下一個 remotecall_fetch() 操作。就實際情況而言,我們並沒有從這裡獲得並行性的好處,因為我們的流程並沒有同時完成他們的工作(即休眠)。

但是,我們可以通過使用 @async@sync 巨集的組合來糾正這個問題:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 2.009416 seconds (274 allocations: 25.592 KB)

現在,如果我們將迴圈的每一步都計算為一個單獨的操作,我們會看到在 @async 巨集之前有兩個單獨的操作。巨集允許每個啟動,並且程式碼在每個完成之前繼續(在這種情況下迴圈的下一步)。但是,使用 @sync 巨集,其範圍包含整個迴圈,意味著我們將不允許指令碼繼續經過該迴圈,直到 @async 之前的所有操作都已完成。

通過進一步調整上述示例,可以更清楚地瞭解這些巨集的操作,以瞭解它在某些修改下如何發生變化。例如,假設我們只有 @async 沒有 @sync

@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 0.001429 seconds (27 allocations: 2.234 KB)

在這裡,@async 巨集允許我們在每個 remotecall_fetch() 操作完成執行之前繼續迴圈。但是,無論好壞,我們都沒有 @sync 巨集來阻止程式碼繼續通過這個迴圈,直到所有的 remotecall_fetch() 操作完成。

儘管如此,即使我們繼續執行,每個 remotecall_fetch() 操作仍然會並行執行。我們可以看到,因為如果我們等待兩秒鐘,那麼包含結果的陣列 a 將包含:

sleep(2)
julia> a
2-element Array{Any,1}:
 nothing
 nothing

nothing 元素是成功獲取 sleep 函式結果的結果,該函式不返回任何值)

我們還可以看到兩個 remotecall_fetch() 操作基本上同時開始,因為在它們之前的 print 命令也快速連續執行(這裡沒有顯示這些命令的輸出)。將此與下一個示例進行對比,其中 print 命令以彼此相差 2 秒的速度執行:

如果我們將 @async 巨集放在整個迴圈上(而不僅僅是它的內部步驟),那麼我們的指令碼將立即繼續,而不必等待 remotecall_fetch() 操作完成。但是,現在我們只允許指令碼作為一個整體繼續經過迴圈。我們不允許迴圈的每個單獨步驟在前一個步驟完成之前啟動。因此,與上面的示例不同,指令碼在迴圈之後繼續兩秒後,results 陣列仍然具有一個元素 #undef,表示第二個 remotecall_fetch() 操作仍然沒有完成。

@time begin
    a = cell(nworkers())
    @async for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to

sleep(2)

a
2-element Array{Any,1}:
    nothing
 #undef    

並且,毫不奇怪,如果我們將 @sync@async 放在彼此旁邊,我們會得到每個 remotecall_fetch() 順序執行(而不是同時執行),但我們不會繼續執行程式碼,直到每個完成。換句話說,如果我們既沒有巨集,也就是說 sleep(2) 的行為與 @sync @async sleep(2) 基本相同,那麼這基本上相當於

@time begin
    a = cell(nworkers())
    @sync @async for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10

另請注意,可以在 @async 巨集的範圍內進行更復雜的操作。該文件提供了一個示例,其中包含 @async 範圍內的整個迴圈。

回想一下,同步巨集的幫助宣告它將“等到 @async@spawn@spawnat@parallel 的所有動態封閉使用完成後”。出於完整的目的,重要的是如何在 @sync@async 巨集的範圍內定義任務。考慮下面的例子,這是上面給出的一個例子的略微變化:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall(pid, sleep, 2)
    end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)

julia> a
2-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,3)
 RemoteRef{Channel{Any}}(3,1,4)

前面的示例大約需要 2 秒鐘才能執行,表明這兩個任務是並行執行的,並且指令碼在繼續之前等待每個任務完成其功能的執行。然而,這個例子的評估時間要低得多。原因是,為了 @sync 的目的,remotecall() 操作已經完成,一旦它傳送了工作人員的工作。 (注意,結果陣列 a,這裡只包含 RemoteRef 物件型別,它們只表示某個特定程序正在發生某些事情,理論上可以在將來的某個時刻獲取)。相比之下,remotecall_fetch() 操作只有在從工作人員獲得其任務完成的訊息時才完成

因此,如果你正在尋找方法,以確保工人某些操作已經在指令碼中移動上(如例如在討論之前完成這篇文章 ),有必要仔細考慮為完整什麼罪名,你怎麼會測量然後在指令碼中操作它。