如何使用 RabbitMQ 中的消息
首先导入库。
from amqpstorm import Connection
在使用消息时,我们首先需要定义一个函数来处理传入的消息。这可以是任何可调用的函数,并且必须采用消息对象或消息元组(取决于 start_consuming
中定义的 to_tuple
参数)。
除了处理来自传入消息的数据外,我们还必须确认或拒绝该消息。这很重要,因为我们需要让 RabbitMQ 知道我们正确接收并处理了消息。
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
接下来,我们需要建立与 RabbitMQ 服务器的连接。
connection = Connection('127.0.0.1', 'guest', 'guest')
之后我们需要建立一个频道。每个连接可以有多个通道,通常在执行多线程任务时,建议(但不要求)每个线程有一个。
channel = connection.channel()
设置好频道后,我们需要让 RabbitMQ 知道我们要开始使用消息。在这种情况下,我们将使用我们之前定义的 on_message
函数来处理所有消耗的消息。
我们将在 RabbitMQ 服务器上监听的队列将是 simple_queue
,我们也告诉 RabbitMQ,一旦完成它们,我们将确认所有传入的消息。
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
最后,我们需要启动 IO 循环来开始处理 RabbitMQ 服务器提供的消息。
channel.start_consuming(to_tuple=False)