如何使用 RabbitMQ 中的訊息
首先匯入庫。
from amqpstorm import Connection
在使用訊息時,我們首先需要定義一個函式來處理傳入的訊息。這可以是任何可呼叫的函式,並且必須採用訊息物件或訊息元組(取決於 start_consuming
中定義的 to_tuple
引數)。
除了處理來自傳入訊息的資料外,我們還必須確認或拒絕該訊息。這很重要,因為我們需要讓 RabbitMQ 知道我們正確接收並處理了訊息。
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
接下來,我們需要建立與 RabbitMQ 伺服器的連線。
connection = Connection('127.0.0.1', 'guest', 'guest')
之後我們需要建立一個頻道。每個連線可以有多個通道,通常在執行多執行緒任務時,建議(但不要求)每個執行緒有一個。
channel = connection.channel()
設定好頻道後,我們需要讓 RabbitMQ 知道我們要開始使用訊息。在這種情況下,我們將使用我們之前定義的 on_message
函式來處理所有消耗的訊息。
我們將在 RabbitMQ 伺服器上監聽的佇列將是 simple_queue
,我們也告訴 RabbitMQ,一旦完成它們,我們將確認所有傳入的訊息。
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
最後,我們需要啟動 IO 迴圈來開始處理 RabbitMQ 伺服器提供的訊息。
channel.start_consuming(to_tuple=False)