Even if you don't understand the five core message modes of RabbitMQ, you dare say you can use message queues!

SpringBoot e-commerce project mall (35k+star) address: https://github.com/macrozheng/mall

abstract

The previous articles on RabbitMQ core message patterns are based on JavaAPI. Recently, I read the official documents and found that these core message patterns can be implemented through Spring AMQP. So I summarized the practical skills of RabbitMQ, including RabbitMQ installation under Windows and Linux and Spring AMQP implementation of five core message modes. I believe it will be helpful for friends who want to learn and review RabbitMQ.

brief introduction

RabbitMQ is one of the most popular open source message oriented middleware, and is widely used worldwide. RabbitMQ is lightweight and easy to deploy, and can support a variety of messaging protocols. RabbitMQ can be deployed in distributed systems to meet the requirements of large-scale and high availability.

Related concepts

Let's first understand the related concepts in RabbitMQ. Here, we take the routing mode among the five message modes as an example.

sign Chinese name English name describe
P producer Producer The sender of the message, which can send the message to the switch
C consumer Consumer The receiver of the message, which gets the message from the queue and consumes it
X Switch Exchange Receive the message sent by the producer and send it to the specified queue according to the routing key
Q queue Queue Store messages from switches
type Switch Type type Different types of switches forward messages in different ways
fanout Publish / subscribe mode fanout Broadcast messages to queues of all bound switches
direct Routing mode direct Send message according to routing key
topic wildcard pattern topic Send messages according to the matching rules of routing keys

Installation and configuration

Next, we will introduce the installation and configuration of RabbitMQ, providing two installation methods: Windows and Linux.

Installation under Windows

  • After installation, enter the sbin directory under the RabbitMQ installation directory;

  • Enter cmd in the address bar and press enter to start the command line, and then enter the following command to start the management function.
rabbitmq-plugins enable rabbitmq_management

Installation under Linux

  • Download the Docker image of rabbitmq 3.7.15;
docker pull rabbitmq:3.7.15
  • Use the Docker command to start the service;
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \
-d rabbitmq:3.7.15
  • Enter the container and turn on the management function;
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_management

  • Open the firewall for Internet access.
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload

Access and configuration

  • Access the RabbitMQ management page address to check whether the installation is successful (you can use the server IP to access it under Linux): http://localhost:15672/

  • Enter the account and password and log in. Here, log in with the default account and password: guest guest
  • Create an account and set its role as administrator: mall mall

  • Create a new virtual host: /mall

  • Click "mall" to enter the user configuration page;

  • Configure the permissions of the virtual host for the mall user;

  • So far, the configuration of RabbitMQ is completed.

5 message modes

These five message patterns are the basis for building message applications based on RabbitMQ. We must firmly grasp them. Friends who have studied RabbitMQ should have known the Java implementation of these message patterns. Here we use Spring AMQP to implement them.

Simple mode

Simple mode is the simplest message mode, which contains a producer, a consumer, and a queue. The producer sends a message to the queue, and the consumer gets the message from the queue and consumes it.

Schematic diagram of mode

Spring AMQP implementation

  • First, you need to create a Add Spring AMQP related dependencies to the XML;
<!--Spring AMQP rely on-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • Then modify the application YML, add related configurations of RabbitMQ;
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /mall
    username: mall
    password: mall
    publisher-confirms: true #Message sent to switch for confirmation
    publisher-returns: true #Message sent to queue confirmation
  • Add the Java configuration related to the simple mode, and create a Java configuration named simple Hello queue, a producer and a consumer;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class SimpleRabbitConfig {

    @Bean
    public Queue hello() {
        return new Queue("simple.hello");
    }

    @Bean
    public SimpleSender simpleSender(){
        return new SimpleSender();
    }

    @Bean
    public SimpleReceiver simpleReceiver(){
        return new SimpleReceiver();
    }

}
  • The producer sends a message to the queue simple send messages in hello;
/**
 * Created by macro on 2020/5/19.
 */
public class SimpleSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

    @Autowired
    private RabbitTemplate template;

    private static final String queueName="simple.hello";

    public void send() {
        String message = "Hello World!";
        this.template.convertAndSend(queueName, message);
        LOGGER.info(" [x] Sent '{}'", message);
    }

}
  • Consumer from queue simple Get messages from hello;
/**
 * Created by macro on 2020/5/19.
 */
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

    @RabbitHandler
    public void receive(String in) {
        LOGGER.info(" [x] Received '{}'", in);
    }

}
  • Add a test interface in the controller and call the interface to start sending messages;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ functional testing")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private SimpleSender simpleSender;

    @ApiOperation("Simple mode")
    @RequestMapping(value = "/simple", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult simpleTest() {
        for(int i=0;i<10;i++){
            simpleSender.send();
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • After running, the results are as follows. It can be found that the producer sends messages to the queue, and the consumer obtains messages from the queue and consumes them.

Operating mode

A working mode is a mode of sending messages to multiple competing consumers. It contains one producer, two consumers, and a queue. Two consumers are bound to a queue at the same time. When the consumer gets the message processing time-consuming task, the idle consumer gets and consumes the message from the queue.

Schematic diagram of mode

Spring AMQP implementation

  • Add the Java configuration related to the working mode, and create a Java configuration named work Hello queue, one producer and two consumers;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class WorkRabbitConfig {

    @Bean
    public Queue workQueue() {
        return new Queue("work.hello");
    }

    @Bean
    public WorkReceiver workReceiver1() {
        return new WorkReceiver(1);
    }

    @Bean
    public WorkReceiver workReceiver2() {
        return new WorkReceiver(2);
    }

    @Bean
    public WorkSender workSender() {
        return new WorkSender();
    }

}
  • The producer sends work send a message in hello. The message contains a certain number of No;
/**
 * Created by macro on 2020/5/19.
 */
public class WorkSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

    @Autowired
    private RabbitTemplate template;

    private static final String queueName = "work.hello";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3+1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('.');
        }
        builder.append(index+1);
        String message = builder.toString();
        template.convertAndSend(queueName, message);
        LOGGER.info(" [x] Sent '{}'", message);
    }

}
  • Two consumers work from the queue Get the message in hello. The names are instance 1 and instance 2. The message contains The more numbers, the longer the time;
/**
 * Created by macro on 2020/5/19.
 */
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

    private final int instance;

    public WorkReceiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String in) {
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }

}
  • Add a test interface in the controller and call the interface to start sending messages;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ functional testing")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private WorkSender workSender;

    @ApiOperation("Operating mode")
    @RequestMapping(value = "/work", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult workTest() {
        for(int i=0;i<10;i++){
            workSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • After running, the results are as follows. It can be found that the producer sends different quantities to the queue instance 1 and instance 2 consumers compete with each other and consume part of the messages respectively.

Publish / subscribe mode

Publish / subscribe mode refers to the mode of sending messages to multiple consumers at the same time (in the form of broadcast). It includes one producer, two consumers, two queues and one switch. Two consumers are bound to different queues at the same time, and the two queues are bound to the switch. The producer sends messages to the switch, and all consumers receive and consume messages.

Schematic diagram of mode

Spring AMQP implementation

  • Add a Java configuration related to the publish / subscribe mode, and create an exchange The switch of fanout, one producer, two consumers and two anonymous queues bind both anonymous queues to the switch;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("exchange.fanout");
    }

    @Bean
    public Queue fanoutQueue1() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue fanoutQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanout);
    }

    @Bean
    public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanout);
    }

    @Bean
    public FanoutReceiver fanoutReceiver() {
        return new FanoutReceiver();
    }

    @Bean
    public FanoutSender fanoutSender() {
        return new FanoutSender();
    }

}
  • The producer sends a message to the exchange send a message in fanout. The message contains a certain number of No;
/**
 * Created by macro on 2020/5/19.
 */
public class FanoutSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
    @Autowired
    private RabbitTemplate template;

    private static final String exchangeName = "exchange.fanout";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3 + 1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('.');
        }
        builder.append(index + 1);
        String message = builder.toString();
        template.convertAndSend(exchangeName, "", message);
        LOGGER.info(" [x] Sent '{}'", message);
    }

}
  • The consumer gets a message from the bound anonymous queue, which contains The more the number, the longer the time. Since the consumer can get and consume messages from the two queues, it can be regarded as two consumers, named instance 1 and instance 2 respectively;
/**
 * Created by macro on 2020/5/19.
 */
public class FanoutReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);

    @RabbitListener(queues = "#{fanoutQueue1.name}")
    public void receive1(String in) {
        receive(in, 1);
    }

    @RabbitListener(queues = "#{fanoutQueue2.name}")
    public void receive2(String in) {
        receive(in, 2);
    }

    private void receive(String in, int receiver) {
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }

}
  • Add a test interface in the controller and call the interface to start sending messages;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ functional testing")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private FanoutSender fanoutSender;

    @ApiOperation("release/subscription model ")
    @RequestMapping(value = "/fanout", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult fanoutTest() {
        for(int i=0;i<10;i++){
            fanoutSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • After running, the results are as follows. It can be found that the producer sends different quantities to the queue instance 1 and instance 2 simultaneously acquire and consume messages.

Routing mode

Routing mode is a mode that can selectively send messages to multiple consumers according to the routing key. It includes one producer, two consumers, two queues and one switch. Two consumers are bound to different queues at the same time. The two queues are bound to the switch through the routing key. The producer sends messages to the switch, and the switch forwards them to different queues through the routing key. The consumers bound to the queue receive and consume messages.

Schematic diagram of mode

Spring AMQP implementation

  • Add a Java configuration related to the routing mode, and create an exchange The direct switch, one producer, two consumers and two anonymous queues. The queues are bound to the switch through routing keys. The routing keys of queue 1 are orange and black, and the routing keys of queue 2 are green and black;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class DirectRabbitConfig {

    @Bean
    public DirectExchange direct() {
        return new DirectExchange("exchange.direct");
    }

    @Bean
    public Queue directQueue1() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue directQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(direct).with("orange");
    }

    @Bean
    public Binding directBinding1b(DirectExchange direct, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(direct).with("black");
    }

    @Bean
    public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(direct).with("green");
    }

    @Bean
    public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(direct).with("black");
    }

    @Bean
    public DirectReceiver receiver() {
        return new DirectReceiver();
    }


    @Bean
    public DirectSender directSender() {
        return new DirectSender();
    }

}
  • The producer sends a message to the exchange When sending messages in direct, different routing keys are used. Messages will be forwarded to different queues according to the routing keys;
/**
 * Created by macro on 2020/5/19.
 */
public class DirectSender {

    @Autowired
    private RabbitTemplate template;

    private static final String exchangeName = "exchange.direct";

    private final String[] keys = {"orange", "black", "green"};

    private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello to ");
        int limitIndex = index % 3;
        String key = keys[limitIndex];
        builder.append(key).append(' ');
        builder.append(index+1);
        String message = builder.toString();
        template.convertAndSend(exchangeName, key, message);
        LOGGER.info(" [x] Sent '{}'", message);
    }

}
  • A consumer gets messages from its own bound anonymous queue. Since the consumer can get and consume messages from two queues, it can be regarded as two consumers, named instance 1 and instance 2 respectively;
/**
 * Created by macro on 2020/5/19.
 */
public class DirectReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);

    @RabbitListener(queues = "#{directQueue1.name}")
    public void receive1(String in){
        receive(in, 1);
    }

    @RabbitListener(queues = "#{directQueue2.name}")
    public void receive2(String in){
        receive(in, 2);
    }

    private void receive(String in, int receiver){
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in){
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }

}
  • Add a test interface in the controller and call the interface to start sending messages;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ functional testing")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private DirectSender directSender;

    @ApiOperation("Routing mode")
    @RequestMapping(value = "/direct", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult directTest() {
        for(int i=0;i<10;i++){
            directSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • The results after running are as follows. It can be found that the producer sends messages containing different routing keys to the queue. instance 1 gets orange and black messages, and instance 2 gets green and black messages.

wildcard pattern

Wildcard mode is a mode that can selectively send messages to multiple consumers according to routing key matching rules. It includes one producer, two consumers, two queues and one switch. Two consumers are bound to different queues at the same time. The two queues are bound to the switch through the routing key matching rules. The producer sends messages to the switch. The switch forwards them to different queues through the routing key matching rules. The consumers bound to the queue receive and consume messages.

Special matching symbol

  • *: only one word can be matched;
  • #: can match zero or more words.

Schematic diagram of mode

Spring AMQP implementation

  • Add a wildcard pattern related Java configuration and create an exchange Topic switch, one producer, two consumers and two anonymous queues, matching * orange* And * rabbit is sent to queue 1, matching lazy# Send to queue 2;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class TopicRabbitConfig {

    @Bean
    public TopicExchange topic() {
        return new TopicExchange("exchange.topic");
    }

    @Bean
    public Queue topicQueue1() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue topicQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
    }

    @Bean
    public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
    }

    @Bean
    public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
        return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
    }

    @Bean
    public TopicReceiver topicReceiver() {
        return new TopicReceiver();
    }

    @Bean
    public TopicSender topicSender() {
        return new TopicSender();
    }

}
  • The producer sends a message to the exchange send messages in topic, which contain different routing keys;
/**
 * Created by macro on 2020/5/19.
 */
public class TopicSender {

    @Autowired
    private RabbitTemplate template;

    private static final String exchangeName = "exchange.topic";

    private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class);


    private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello to ");
        int limitIndex = index%keys.length;
        String key = keys[limitIndex];
        builder.append(key).append(' ');
        builder.append(index+1);
        String message = builder.toString();
        template.convertAndSend(exchangeName, key, message);
        LOGGER.info(" [x] Sent '{}'",message);
        System.out.println(" [x] Sent '" + message + "'");
    }

}
  • A consumer gets messages from its own bound anonymous queue. Since the consumer can get and consume messages from two queues, it can be regarded as two consumers, named instance 1 and instance 2 respectively;
/**
 * Created by macro on 2020/5/19.
 */
public class TopicReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);

    @RabbitListener(queues = "#{topicQueue1.name}")
    public void receive1(String in){
        receive(in, 1);
    }

    @RabbitListener(queues = "#{topicQueue2.name}")
    public void receive2(String in){
        receive(in, 2);
    }

    public void receive(String in, int receiver){
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in){
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }

}
  • Add a test interface in the controller and call the interface to start sending messages;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ functional testing")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private TopicSender topicSender;

    @ApiOperation("wildcard pattern ")
    @RequestMapping(value = "/topic", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult topicTest() {
        for(int i=0;i<10;i++){
            topicSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • After running, the results are as follows. It can be found that the producer sends messages containing different routing keys to the queue, and instance 1 and instance 2 get matching messages respectively.

References

RabbitMQ Tutorials:https://www.rabbitmq.com/gets...

Project source code address

https://github.com/macrozheng...

official account

mall project In the series of the full set of learning tutorials, pay attention to the official account for the first time.

Tags: Java Spring Boot RabbitMQ

Posted by dibyajyotig on Wed, 01 Jun 2022 02:49:47 +0530