无序消息处理
你可以使用 Scan
或 TryScan
方法查找队列中的特定消息,并处理它们,而不管它们之前有多少消息。这两种方法按照它们到达的顺序查看队列中的消息,并查找指定的消息(直到可选的超时)。如果没有这样的消息,TryScan
将返回 None,而 Scan
将一直等待,直到此消息到达或操作超时。
让我们在实践中看到它。我们希望处理器尽可能地处理 RegularOperations
,但是只要有 PriorityOperation
,就应该尽快处理它,无论队列中有多少其他 RegularOperations
。
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())