可变状态管理
邮箱处理器可用于以透明和线程安全的方式管理可变状态。让我们构建一个简单的计数器。
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
现在让我们生成一些操作
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
你将看到以下日志
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
并发
由于邮箱处理器逐个处理消息并且没有交错,因此你还可以从多个线程生成消息,并且你将看不到丢失或重复操作的典型问题。除非你专门实现处理器,否则消息无法使用其他消息的旧状态。
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
所有消息都是从不同的线程发布的。消息发布到邮箱的顺序不确定,因此处理它们的顺序不是确定性的,但由于增量和减量的总数是平衡的,因此你将看到最终状态为 0,无论顺序如何以及从哪些线程发送消息。
真正的可变状态
在前面的示例中,我们仅通过传递递归循环参数来模拟可变状态,但邮箱处理器即使对于真正可变的状态也具有所有这些属性。当你保持较大的状态时,这很重要,并且由于性能原因,不可变性是不切实际的。
我们可以将我们的计数器重写为以下实现
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
即使直接从多个线程修改计数器状态,这肯定不是线程安全的,你可以通过使用上一节中的并行消息 Posts 看到邮箱处理器一个接一个地处理消息而没有交错,因此每条消息都使用最新的价值。