如何使用 RabbitMQ 中的訊息

首先匯入庫。

from amqpstorm import Connection

在使用訊息時,我們首先需要定義一個函式來處理傳入的訊息。這可以是任何可呼叫的函式,並且必須採用訊息物件或訊息元組(取決於 start_consuming 中定義的 to_tuple 引數)。

除了處理來自傳入訊息的資料外,我們還必須確認或拒絕該訊息。這很重要,因為我們需要讓 RabbitMQ 知道我們正確接收並處理了訊息。

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

接下來,我們需要建立與 RabbitMQ 伺服器的連線。

connection = Connection('127.0.0.1', 'guest', 'guest')

之後我們需要建立一個頻道。每個連線可以有多個通道,通常在執行多執行緒任務時,建議(但不要求)每個執行緒有一個。

channel = connection.channel()

設定好頻道後,我們需要讓 RabbitMQ 知道我們要開始使用訊息。在這種情況下,我們將使用我們之前定義的 on_message 函式來處理所有消耗的訊息。

我們將在 RabbitMQ 伺服器上監聽的佇列將是 simple_queue,我們也告訴 RabbitMQ,一旦完成它們,我們將確認所有傳入的訊息。

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

最後,我們需要啟動 IO 迴圈來開始處理 RabbitMQ 伺服器提供的訊息。

channel.start_consuming(to_tuple=False)