Message queue RabbitMQ

1. Basic concepts

RabbitMQ is an open source AMQP implementation. The server side is written in Erlang language and supports multiple clients, such as: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, etc., and supports AJAX. It is used to store and forward messages in a distributed system, and performs well in terms of ease of use, scalability, and high availability.

To install RabbitMQ, you need to install the Erlang environment and configure environment variables first. After installation, enter the sbin directory of RabbitMQ and run the command to activate the console interface. address   The account password is guest.

rabbitmq-plugins enable rabbitmq_management

Two, users

  1. Super administrator (administrator): can log in to the management console, can view all information, and can operate users and policies.
  2. Monitor: You can log in to the management console, and at the same time you can view the relevant information of the rabbitmq node (number of processes, memory usage, disk usage, etc.)
  3. policy Maker (policymaker): You can log in to the management console and manage policies at the same time. But you cannot view the relevant information of the node (the part marked by the red box in the figure above).
  4. Ordinary manager (management): can only log in to the management console, cannot see node information, and cannot manage policies.
  5. Others: Unable to log in to the management console, usually ordinary producers and consumers.

3. Working mode

There are five main working modes of RabbitMQ, namely:

  1. Simple mode (hello world)
  2. Work queue mode (work queue)
  3. Publish/subscribe mode (publish/subscribe)
  4. Routing mode (routing)
  5. theme mode (topic)

  Import dependencies:

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>

        Tools:

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //Define a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set service address
        factory.setHost("localhost");
        //port
        factory.setPort(5672);
        //Set account information, user name, password, vhost
        factory.setVirtualHost("vhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // Get connection via factory
        Connection connection = factory.newConnection();
        return connection;
    }
}

1. Simple mode (hello world):

//send Message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        // Create a channel from a connection
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare("hello", false, false, false, null);

        // Message content
        String message = "Hello World!";
        channel.basicPublish("", "hello", null, message.getBytes());

        //close channel and connection
        channel.close();
        connection.close();
}

//receive message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        // Create a channel from a connection
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare("hello", false, false, false, null);

        // Define the consumer of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // listening queue
        channel.basicConsume("hello", true, consumer);

        // get news
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
        }
}

2. Work queue mode (work queue): Multiple consumers consume messages from the same queue.

//receive message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare("hello", false, false, false, null);

        // At the same time, the server will only send one message to the consumer, otherwise MQ will send all requests to all consumers on average
        channel.basicQos(1);

        // Define the consumer of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen to the queue, false means that the completion status is returned manually, and true means that the completion is automatically confirmed immediately after receiving the message
        channel.basicConsume("hello", false, consumer);

        // get news
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // Return the confirmation status, otherwise it means to use the automatic confirmation mode
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

3. Publish/subscribe mode (publish/subscribe): Send messages to multiple queues through the switch.

//Send a message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // declare exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // Message content
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        channel.close();
        connection.close();
}

//receive message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // Bind the queue to the exchange
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // At the same time, the server will only send one message to the consumer
        channel.basicQos(1);

        // Define the consumer of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen to the queue and return to complete manually
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // get news
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // return complete status
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

4. Routing mode (routing): Send messages to different queues through routing matching through switches.

//Send a message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // Declare exchange and type
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // Message content
        String message = "Hello World!";
        //Specify message routing
        channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());

        channel.close();
        connection.close();
}

//receive message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // Bind the queue to the exchange and specify multiple routes
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");

        // At the same time, the server will only send one message to the consumer
        channel.basicQos(1);

        // Define the consumer of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen to the queue and return to complete manually
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // get news
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // return complete status
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

5. Topic mode (topic): Send messages to different queues through wildcard matching through switches.

//Send a message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // Declare exchange and type
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // Message content
        String message = "Hello World!";
        //Specify message matching keywords
        channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());

        channel.close();
        connection.close();
}

//receive message
public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // Bind the queue to the exchange and specify multiple wildcards
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");

        // At the same time, the server will only send one message to the consumer
        channel.basicQos(1);

        // Define the consumer of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen to the queue and return to complete manually
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // get news
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // return complete status
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

4. Spring integration

Spring provides the RabbitTemplate class to perform messaging.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: 192.168.88.88
    port: 5672
    username: guest
    password: guest
    virtual-host: /
@Configuration
public class MQConfig {

    @Bean
    public Exchange exchange1(){
        return ExchangeBuilder.fanoutExchange("fanout").build();
    }
    @Bean
    public Exchange exchange2(){
        return ExchangeBuilder.directExchange("direct").build();
    }
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable("hello1").build();
    }
    @Bean
    public Queue queue2(){
        return QueueBuilder.durable("hello2").build();
    }
    @Bean
    public Binding binding1(Exchange exchange1,Queue queue1){
        return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();
    }
    @Bean
    public Binding binding2(Exchange exchange2,Queue queue2){
        return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();
    }
}
@Component
//Define queue and bind
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),
        exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {

    @RabbitHandler
    public void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
            throws IOException {
        
        //return status manually
        if () {
            // In RabbitMQ's ack mechanism, the second parameter returns true, indicating that the message needs to be delivered to other consumers for re-consumption
            channel.basicAck(deliveryTag, false);
        } else {
            // The third parameter is true, indicating that the message will re-enter the queue
            channel.basicNack(deliveryTag, false, true);
        }

    }
}

Tags: Java RabbitMQ message queue java-rabbitmq

Posted by punky79 on Mon, 27 Feb 2023 18:41:45 +0530