如何在 RabbitMQ 中创建延迟队列
首先,我们需要设置两个基本通道,一个用于主队列,另一个用于延迟队列。在最后的示例中,我添加了一些不需要的附加标志,但使代码更可靠; 如 confirm delivery
,delivery_mode
和 durable
。你可以在 RabbitMQ 手册中找到有关这些的更多信息。
在我们设置了通道后,我们添加了一个绑定到主通道,我们可以使用它来将消息从延迟通道发送到主队列。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
接下来,我们需要配置延迟通道,以便在消息到期后将消息转发到主队列。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
-
x-message-ttl (消息 - 生存时间)
这通常用于在特定持续时间后自动删除队列中的旧消息,但通过添加两个可选参数,我们可以更改此行为,而是使用此参数以毫秒为单位确定消息在延迟队列中保留多长时间。
-
此变量允许我们在消息过期后将消息传输到其他队列,而不是完全删除它的默认行为。
-
此变量确定用于将消息从 hello_delay 传输到 hello 队列的 Exchange。
发布到延迟队列
当我们完成设置所有基本 Pika 参数后,你只需使用基本发布将消息发送到延迟队列。
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
执行完脚本后,你应该会在 RabbitMQ 管理模块中看到以下队列。
例
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")