如何在 RabbitMQ 中建立延遲佇列
首先,我們需要設定兩個基本通道,一個用於主佇列,另一個用於延遲佇列。在最後的示例中,我新增了一些不需要的附加標誌,但使程式碼更可靠; 如 confirm delivery
,delivery_mode
和 durable
。你可以在 RabbitMQ 手冊中找到有關這些的更多資訊。
在我們設定了通道後,我們新增了一個繫結到主通道,我們可以使用它來將訊息從延遲通道傳送到主佇列。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
接下來,我們需要配置延遲通道,以便在訊息到期後將訊息轉發到主佇列。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
-
x-message-ttl (訊息 - 生存時間)
這通常用於在特定持續時間後自動刪除佇列中的舊訊息,但通過新增兩個可選引數,我們可以更改此行為,而是使用此引數以毫秒為單位確定訊息在延遲佇列中保留多長時間。
-
此變數允許我們在訊息過期後將訊息傳輸到其他佇列,而不是完全刪除它的預設行為。
-
此變數確定用於將訊息從 hello_delay 傳輸到 hello 佇列的 Exchange。
釋出到延遲佇列
當我們完成設定所有基本 Pika 引數後,你只需使用基本釋出將訊息傳送到延遲佇列。
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
執行完指令碼後,你應該會在 RabbitMQ 管理模組中看到以下佇列。
例
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")