RocketMQ: message sending and consumption

Before that, we have already introduced RocketMQ: getting started and RocketMQ: building clusters . Now that we have prepared the RocketMQ environment, the next step is to demonstrate the production and consumption of various message types in RocketMQ through actual combat.

1. development environment

(1) Introducing RocketMQ client dependencies

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

(2) Analysis of production and consumption steps

  • Message producer
1.Create message producer producer,And specify the producer group name
2.specify Nameserver address
3.start-up producer
4.Create message object and specify subject Topic,Tag And message body
5.send message
6.Shut down producer producer
  • Message consumer
1.Create consumer Consumer,Develop consumer group name
2.specify Nameserver address
3.Subscribe to topics Topic and Tag
4.Set callback function to process messages
5.Start consumer consumer

2. basic example

2.2 message sending

2.2.1 send synchronization message

This reliable and synchronous sending method is widely used, such as important message notification and SMS notification.

public class SyncProducer {

    public static void main(String[] args) throws Exception {
        // 1. create a message producer and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("base-sync-producer");
        // 2. specify NameServer address
        producer.setNamesrvAddr("192.168.1.17:9876");
        // 3. start producer
        producer.start();
        // 4. create a message object and specify topic, tag and message body
        Message message = new Message("TestTopic", "TagA", "Hello World!".getBytes());
        // 5. send message
        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);
        // 6. shut down the producer
        producer.shutdown();
    }
}

2.2.2 sending asynchronous messages

Asynchronous messages are usually used in business scenarios that are sensitive to response time, that is, the sender cannot tolerate waiting for a Broker's response for a long time.

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        // 1. instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("base-async-producer");
        // 2. set the address of NameServer
        producer.setNamesrvAddr("192.168.1.17:9876");
        // 3. start the Producer instance
        producer.start();
        // Number of retries when asynchronous sending fails. Default = 2
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            // 4. create a message and specify Topic, Tag and message body
            Message msg = new Message("TestTopic",
                    "TagA",
                    "OrderID188",
                    "Hello World!".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 5.SendCallback receives the callback of the asynchronous return result
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        // Sleep for one minute. Otherwise, when the producer is closed, the asynchronous callback result of mq cannot be received
        TimeUnit.MINUTES.sleep(1);
        // 6. if the message is not sent again, close the Producer instance.
        producer.shutdown();
    }
}

2.2.3 send one-way message

This method is mainly used in scenarios that do not particularly care about sending results, such as log sending.

public class OnewayProducer {

    public static void main(String[] args) throws Exception {
        // 1. instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("base-oneway-producer");
        // 2. set the address of NameServer
        producer.setNamesrvAddr("192.168.1.17:9876");
        // 3. start the Producer instance
        producer.start();
        for (int i = 0; i < 5; i++) {
            // 4. create a message and specify Topic, Tag and message body
            Message msg = new Message("TestTopic",
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
            );
            // 5. send a one-way message without any return results
            producer.sendOneway(msg);
        }
        // 6. if the message is not sent again, close the Producer instance.
        producer.shutdown();
    }
}

2.2 consumption message

2.2.1 load balancing mode

Consumers consume messages in a load balancing manner. Multiple consumers consume messages in a queue. Each consumer processes different messages.

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. instantiate the message producer and specify the group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2. specify NameServer address information
        consumer.setNamesrvAddr("192.168.1.17:9876");
        // 3. subscribe to Topic
        consumer.subscribe("TestTopic", "*");
        // 4. consumption in load balancing mode (the default is load balancing mode)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 5. register the callback function to process messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 6. start message sender
        consumer.start();
    }
}

2.2.2 broadcast mode

Consumers consume messages by broadcasting, and the messages consumed by each consumer are the same.

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. instantiate the message producer and specify the group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2. specify NameServer address information
        consumer.setNamesrvAddr("192.168.1.17:9876");
        // 3. subscribe to Topic
        consumer.subscribe("TestTopic", "*");
        // 4. broadcast mode consumption (default load balancing mode)
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 5. register the callback function to process messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 6. start message sender
        consumer.start();
    }
}

Note: only the key code will be attached below. You will know how to use it when you know the following object names during reading. For example: Producer: refers to the producer; Consumer: consumer; Message: indicates a message.

3. sequence message

Message ordering means that messages can be consumed (FIFO) in the order they are sent. RocketMQ can strictly guarantee the order of messages, which can be divided into partition order or global order.

Analysis of the principle of sequential consumption: by default, the Round Robin polling method will be used to send messages to different queues (partition queues) when sending messages. When consuming messages, messages will be pulled from multiple queues. In this case, the order of sending and consumption cannot be guaranteed. However, if the sequence messages controlled to be sent are only sent to the same queue in turn, and they are only pulled from this queue in turn during consumption, the sequence is guaranteed. When there is only one queue for sending and consuming, it is globally ordered; If multiple queues participate, the partitions are ordered, that is, the messages are ordered relative to each queue.

3.1 send sequence message

The following shows the fragment code of message queuing:

/**
 * Parameter 1: message object
 * Parameter 2: message queue selector
 * Parameter 3: select the service ID of the queue
 */
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
    /**
     * Message queue select callback method
     * @param mqs Message queue collection
     * @param msg news
     * @param arg Business ID, that is, order Getid()
     * @return
     */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Long id = (Long) arg;  //Select send queue according to order id
        long index = id % mqs.size();
        return mqs.get((int) index);
    }
}, order.getId());

3.2 consumption order message

For consumption order messages, RocketMQ will ensure that each queue has only one consumer thread to consume, thus ensuring the orderly partitioning of messages.

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(true);
        for (MessageExt msg : msgs) {
            // You can see that each queue has a unique consumer thread to consume, and the orders are ordered for each queue (partition)
            System.out.println("consumeThread=" + Thread.currentThread().getName() + ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

4. delay message

Delayed messages are usually used to deliver and consume messages after a certain period of time. For example, we usually buy train tickets and have half an hour to pay after submitting the order. At this time, there is a delay message when submitting the order. After half an hour, check the status of the order. If the order is still unpaid, cancel the order.

4.1 sending delay messages

Sending a delay message is simple. You only need to set the delay level of the message object, such as:

// Set the message delay level to 3, i.e. 10s delay
message.setDelayTimeLevel(3);

Default delay level: 1s 5S 10s 30s 1m 2m 4m 5m 7m 8m 9m 10m 30m 1H 2h, a total of 18 levels.

4.2 consumption delay message

The consumer does not need any special treatment, such as:

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n",
                          Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

5. batch messages

Sending messages in bulk can significantly improve the performance of delivering small messages. The limitation is that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delay messages. In addition, by default, the total size of this batch of messages should not exceed 4MB.

5.1 sending batch messages

If we only send messages of no more than 4MB each time, it is easy to use batch processing. An example is as follows:

// Create a message object, and specify topic, tag, and message body
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "Hello World 0".getBytes()));
messages.add(new Message(topic, "TagB", "Hello World 1".getBytes()));
messages.add(new Message(topic, "TagC", "Hello World 2".getBytes()));
// send message
SendResult sendResult = producer.send(messages);
System.out.printf("%s%n", sendResult);

If the total size of the message may be larger than 4MB, it is best to split the message:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // Increase log overhead by 20 bytes
            if (tmpSize > SIZE_LIMIT) {
                //Single message exceeds maximum limit
                //Ignore, otherwise the split process will be blocked
                if (nextIndex - currIndex == 0) {
                    //If the next sub list has no elements, add this sub list and exit the loop; otherwise, just exit the loop
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

Use:

//Split a large message into several small messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //Processing error
  }
}

6. filter messages

In most cases, TAG is a simple and useful design for selecting the messages you want. For example:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampeConsumerGroup");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

Consumer will receive a message containing TAGA or TAGB or TAGC. However, the limitation is that a message can only have one tag, which may not work in complex scenarios. In this case, you can filter messages using SQL expressions. The SQL feature can be calculated from the properties when the message is sent. Under the syntax defined by RocketMQ, you can implement some simple logic. Here is an example:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

6.1 basic SQL syntax

RocketMQ only defines some basic syntax to support this feature. You can also easily extend it.

  • Numerical comparison, such as: >, > =, <, < =, BETWEEN, =;
  • Character comparison, such as: =, < >, IN;
  • IS NULL or IS NOT NULL;
  • Logical symbols AND, OR, NOT;

Constant support types are:

  • Value, for example: 123, 3.1415;
  • Characters, such as' abc ', must be enclosed in single quotation marks;
  • NULL, special constant
  • Boolean, TRUE or FALSE

Only consumers using push mode can use SQL92 standard sql statements. The interfaces are as follows:

public void subscribe(finalString topic, final MessageSelector messageSelector)

6.1 message producers

When sending a message, we can set the properties of the message through putUserProperty.

for (int i = 0; i < 10; i++) {
    // Create a message object, and specify topic, tag, and message body
    Message messages = new Message("TestTopic", ("Hello, this is filter msg body! My index is :" + i).getBytes());
    messages.putUserProperty("index", String.valueOf(i));
    // send message
    SendResult sendResult = producer.send(messages);
    System.out.printf("%s%n", sendResult);
}

6.2 message consumers

Use messageselector Bysql to filter messages using sql.

// Subscribe to Topic, and only subscribed messages have this index attribute, index > =0 and index < = 3
consumer.subscribe("TestTopic", MessageSelector.bySql("index between 0 and 3"));

Note that messageselector For bysql, the Configure enablePropertyFilter=true in conf, otherwise the following error will be reported:

# org.apache.rocketmq.broker.processor.ClientManageProcessor.checkClientConfig()
The broker does not support consumer to filter message by SQL92

7. transaction messages

7.1 process analysis

The general scheme of transaction messages is divided into two processes: the sending and submission of normal transaction messages and the compensation process of transaction messages.

7.1.1 transaction message sending and submission

(1) Send a message (half message).

(2) The server response message writes the result.

(3) Execute the local transaction according to the sending result (if the write fails, the half message is not visible to the business, and the local logic will not execute).

(4) Execute Commit or Rollback according to the local transaction status (the Commit operation generates a message index, and the message is visible to the consumer).

7.1.2 transaction compensation

(1) For transaction messages without Commit/Rollback (messages in pending status), initiate a "backcheck" from the server.

(2) The producer receives the callback message and checks the status of the local transaction corresponding to the callback message.

(3) Re Commit or Rollback according to the local transaction status.

The compensation phase is used to solve the timeout or failure of the message Commit or Rollback.

7.1.3 transaction message status

Transaction messages have three statuses: commit status, rollback status, and intermediate status:

  • Transactionstatus Committransaction: commit a transaction that allows the consumer to consume this message.
  • Transactionstatus Rollbacktransaction: rollback transaction, which means that the message will be deleted and cannot be consumed.
  • Transactionstatus Unknown: intermediate status, which means that the message queue needs to be checked to determine the status.

7.2 sending transaction messages

7.2.1 creating transactional producers

Using the TransactionMQProducer class to create a producer and specify a unique producer group, you can set up a custom thread pool to process these check requests. After executing a local transaction, you need to reply to the message queue according to the execution results.

public class TransactionMsgProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. create a transaction listener
        TransactionListener transactionListener = new TransactionListenerImpl();
        // 2. create a message producer
        TransactionMQProducer producer = new TransactionMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("192.168.1.17:9876");
        // 3. set the transaction listener
        producer.setTransactionListener(transactionListener);
        // 4. start message producer
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            try {
                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                TimeUnit.SECONDS.sleep(1);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        // 5. close
        producer.shutdown();
    }
}

7.2.2 implement the transaction listening interface

When the semi message is sent successfully, we use the executeLocalTransaction method to execute the local transaction. It returns one of the three transaction states mentioned in the previous section. The checklocaltransmission method is used to check the local transaction status and respond to the check request of the message queue. It is also one of the three transaction states mentioned in the previous section.

public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("Execute local transactions");
        if (StringUtils.equals("TagA", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }

    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("MQ Check message Tag[" + msg.getTags() + "]Local transaction execution results for");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

7.2.3 restrictions on use

  1. Transaction messages do not support delayed messages and batch messages.
  2. In order to avoid a single message being checked too many times, resulting in the accumulation of half queue messages, we limit the number of checks for a single message to 15 by default, but users can modify this limit through the transactionCheckMax parameter of the Broker configuration file. If a message has been checked more than N times (N = transactionCheckMax), the Broker will discard the message and print the error log at the same time by default. Users can modify this behavior by overriding the AbstractTransactionCheckListener class.
  3. Transaction messages are checked after a certain length of time, such as the parameter transactionMsgTimeout in the Broker configuration file. When sending transaction messages, users can also set the user attribute check_ Immundity_ Time_ In_ Seconds to change this limit. This parameter takes precedence over the transactionMsgTimeout parameter.
  4. Transactional messages may be checked or consumed more than once.
  5. The target topic message submitted to the user may fail. At present, this depends on the log records. Its high availability is guaranteed through RocketMQ's own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, it is recommended to use the synchronous dual write mechanism.
  6. The producer ID of a transaction message cannot be shared with the producer ID of other types of messages. Unlike other types of messages, transaction messages allow reverse queries, and MQ servers can query consumers through their producer IDs.
-End- For more wonderful sharing, you can scan the code to follow the wechat official account.

Tags: RocketMQ message queue

Posted by Muddy_Funster on Tue, 31 May 2022 12:05:27 +0530