可變狀態管理
郵箱處理器可用於以透明和執行緒安全的方式管理可變狀態。讓我們構建一個簡單的計數器。
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
現在讓我們生成一些操作
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
你將看到以下日誌
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
併發
由於郵箱處理器逐個處理訊息並且沒有交錯,因此你還可以從多個執行緒生成訊息,並且你將看不到丟失或重複操作的典型問題。除非你專門實現處理器,否則訊息無法使用其他訊息的舊狀態。
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
所有訊息都是從不同的執行緒釋出的。訊息釋出到郵箱的順序不確定,因此處理它們的順序不是確定性的,但由於增量和減量的總數是平衡的,因此你將看到最終狀態為 0,無論順序如何以及從哪些執行緒傳送訊息。
真正的可變狀態
在前面的示例中,我們僅通過傳遞遞迴迴圈引數來模擬可變狀態,但郵箱處理器即使對於真正可變的狀態也具有所有這些屬性。當你保持較大的狀態時,這很重要,並且由於效能原因,不可變性是不切實際的。
我們可以將我們的計數器重寫為以下實現
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
即使直接從多個執行緒修改計數器狀態,這肯定不是執行緒安全的,你可以通過使用上一節中的並行訊息 Posts 看到郵箱處理器一個接一個地處理訊息而沒有交錯,因此每條訊息都使用最新的價值。