無序訊息處理
你可以使用 Scan
或 TryScan
方法查詢佇列中的特定訊息,並處理它們,而不管它們之前有多少訊息。這兩種方法按照它們到達的順序檢視佇列中的訊息,並查詢指定的訊息(直到可選的超時)。如果沒有這樣的訊息,TryScan
將返回 None,而 Scan
將一直等待,直到此訊息到達或操作超時。
讓我們在實踐中看到它。我們希望處理器儘可能地處理 RegularOperations
,但是只要有 PriorityOperation
,就應該儘快處理它,無論佇列中有多少其他 RegularOperations
。
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())