可变状态管理

邮箱处理器可用于以透明和线程安全的方式管理可变状态。让我们构建一个简单的计数器。

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

现在让我们生成一些操作

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

你将看到以下日志

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

并发

由于邮箱处理器逐个处理消息并且没有交错,因此你还可以从多个线程生成消息,并且你将看不到丢失或重复操作的典型问题。除非你专门实现处理器,否则消息无法使用其他消息的旧状态。

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

所有消息都是从不同的线程发布的。消息发布到邮箱的顺序不确定,因此处理它们的顺序不是确定性的,但由于增量和减量的总数是平衡的,因此你将看到最终状态为 0,无论顺序如何以及从哪些线程发送消息。

真正的可变状态

在前面的示例中,我们仅通过传递递归循环参数来模拟可变状态,但邮箱处理器即使对于真正可变的状态也具有所有这些属性。当你保持较大的状态时,这很重要,并且由于性能原因,不可变性是不切实际的。

我们可以将我们的计数器重写为以下实现

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

即使直接从多个线程修改计数器状态,这肯定不是线程安全的,你可以通过使用上一节中的并行消息 Posts 看到邮箱处理器一个接一个地处理消息而没有交错,因此每条消息都使用最新的价值。