RabbitMQ Message Publishing and Consuming with Pika Library

  • Share this:

Code introduction


This code defines two functions, one for sending messages to a RabbitMQ exchange, and another for consuming messages from a queue. The code interacts with RabbitMQ using the Pika library.


Technology Stack : The code uses the Pika library to interact with RabbitMQ. The functions defined in the code are related to publishing and consuming messages in a message queue system.

Code Type : The type of code

Code Difficulty : Intermediate


                
                    
import random
from pika import BlockingConnection, ConnectionParameters, BasicProperties, BasicDeliver

def random_exchange_type():
    exchange_types = ['fanout', 'direct', 'topic', 'headers']
    return random.choice(exchange_types)

def random_routing_key():
    return ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=10))

def random_message():
    return 'Hello, RabbitMQ!'

def publish_message_to_exchange(connection, exchange_name, routing_key):
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange_name, exchange_type=random_exchange_type())
    message = random_message()
    properties = BasicProperties(routing_key=routing_key)
    channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=properties)
    print(f"Message '{message}' sent to exchange '{exchange_name}' with routing key '{routing_key}'.")

def consume_messages_from_exchange(connection, exchange_name, queue_name, routing_key):
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange_name, exchange_type=random_exchange_type())
    channel.queue_declare(queue=queue_name)
    channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
    print(f"Waiting for messages in queue '{queue_name}' with routing key '{routing_key}'.")

    def callback(ch, method, properties, body):
        print(f"Received message '{body}' from queue '{queue_name}' with routing key '{routing_key}'.")

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    print(f"Started consuming messages from queue '{queue_name}' with routing key '{routing_key}'.")

    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        print("Interrupted")
        channel.stop_consuming()

# JSON representation of the function and its details