1. Basic concepts
RabbitMQ is an open source AMQP implementation. The server side is written in Erlang language and supports multiple clients, such as: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, etc., and supports AJAX. It is used to store and forward messages in a distributed system, and performs well in terms of ease of use, scalability, and high availability.
To install RabbitMQ, you need to install the Erlang environment and configure environment variables first. After installation, enter the sbin directory of RabbitMQ and run the command to activate the console interface. address The account password is guest.
rabbitmq-plugins enable rabbitmq_management
Two, users
- Super administrator (administrator): can log in to the management console, can view all information, and can operate users and policies.
- Monitor: You can log in to the management console, and at the same time you can view the relevant information of the rabbitmq node (number of processes, memory usage, disk usage, etc.)
- policy Maker (policymaker): You can log in to the management console and manage policies at the same time. But you cannot view the relevant information of the node (the part marked by the red box in the figure above).
- Ordinary manager (management): can only log in to the management console, cannot see node information, and cannot manage policies.
- Others: Unable to log in to the management console, usually ordinary producers and consumers.
3. Working mode
There are five main working modes of RabbitMQ, namely:
- Simple mode (hello world)
- Work queue mode (work queue)
- Publish/subscribe mode (publish/subscribe)
- Routing mode (routing)
- theme mode (topic)
Import dependencies:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
Tools:
public class ConnectionUtil { public static Connection getConnection() throws Exception { //Define a connection factory ConnectionFactory factory = new ConnectionFactory(); //Set service address factory.setHost("localhost"); //port factory.setPort(5672); //Set account information, user name, password, vhost factory.setVirtualHost("vhost"); factory.setUsername("guest"); factory.setPassword("guest"); // Get connection via factory Connection connection = factory.newConnection(); return connection; } }
1. Simple mode (hello world):
//send Message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); // Create a channel from a connection Channel channel = connection.createChannel(); // declare queue channel.queueDeclare("hello", false, false, false, null); // Message content String message = "Hello World!"; channel.basicPublish("", "hello", null, message.getBytes()); //close channel and connection channel.close(); connection.close(); } //receive message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); // Create a channel from a connection Channel channel = connection.createChannel(); // declare queue channel.queueDeclare("hello", false, false, false, null); // Define the consumer of the queue QueueingConsumer consumer = new QueueingConsumer(channel); // listening queue channel.basicConsume("hello", true, consumer); // get news while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); } }
2. Work queue mode (work queue): Multiple consumers consume messages from the same queue.
//receive message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // declare queue channel.queueDeclare("hello", false, false, false, null); // At the same time, the server will only send one message to the consumer, otherwise MQ will send all requests to all consumers on average channel.basicQos(1); // Define the consumer of the queue QueueingConsumer consumer = new QueueingConsumer(channel); // Listen to the queue, false means that the completion status is returned manually, and true means that the completion is automatically confirmed immediately after receiving the message channel.basicConsume("hello", false, consumer); // get news while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); // Return the confirmation status, otherwise it means to use the automatic confirmation mode channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
3. Publish/subscribe mode (publish/subscribe): Send messages to multiple queues through the switch.
//Send a message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // declare exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // Message content String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); } //receive message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); // Bind the queue to the exchange channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // At the same time, the server will only send one message to the consumer channel.basicQos(1); // Define the consumer of the queue QueueingConsumer consumer = new QueueingConsumer(channel); // Listen to the queue and return to complete manually channel.basicConsume(QUEUE_NAME, false, consumer); // get news while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); // return complete status channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
4. Routing mode (routing): Send messages to different queues through routing matching through switches.
//Send a message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // Declare exchange and type channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // Message content String message = "Hello World!"; //Specify message routing channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes()); channel.close(); connection.close(); } //receive message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); // Bind the queue to the exchange and specify multiple routes channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2"); // At the same time, the server will only send one message to the consumer channel.basicQos(1); // Define the consumer of the queue QueueingConsumer consumer = new QueueingConsumer(channel); // Listen to the queue and return to complete manually channel.basicConsume(QUEUE_NAME, false, consumer); // get news while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); // return complete status channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
5. Topic mode (topic): Send messages to different queues through wildcard matching through switches.
//Send a message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // Declare exchange and type channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // Message content String message = "Hello World!"; //Specify message matching keywords channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes()); channel.close(); connection.close(); } //receive message public static void main(String[] argv) throws Exception { // Get the connection and mq channel Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); // Bind the queue to the exchange and specify multiple wildcards channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*"); // At the same time, the server will only send one message to the consumer channel.basicQos(1); // Define the consumer of the queue QueueingConsumer consumer = new QueueingConsumer(channel); // Listen to the queue and return to complete manually channel.basicConsume(QUEUE_NAME, false, consumer); // get news while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); // return complete status channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
4. Spring integration
Spring provides the RabbitTemplate class to perform messaging.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring: rabbitmq: host: 192.168.88.88 port: 5672 username: guest password: guest virtual-host: /
@Configuration public class MQConfig { @Bean public Exchange exchange1(){ return ExchangeBuilder.fanoutExchange("fanout").build(); } @Bean public Exchange exchange2(){ return ExchangeBuilder.directExchange("direct").build(); } @Bean public Queue queue1(){ return QueueBuilder.durable("hello1").build(); } @Bean public Queue queue2(){ return QueueBuilder.durable("hello2").build(); } @Bean public Binding binding1(Exchange exchange1,Queue queue1){ return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs(); } @Bean public Binding binding2(Exchange exchange2,Queue queue2){ return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs(); } }
@Component //Define queue and bind @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"), exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL") public class MyListener { @RabbitHandler public void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { //return status manually if () { // In RabbitMQ's ack mechanism, the second parameter returns true, indicating that the message needs to be delivered to other consumers for re-consumption channel.basicAck(deliveryTag, false); } else { // The third parameter is true, indicating that the message will re-enter the queue channel.basicNack(deliveryTag, false, true); } } }