Custom message queue component MQ spring boot starter

Why customize the message queue component MQ spring boot starter

Considering that the same system of the company can support Alibaba cloud deployment and customer intranet deployment, only the configuration attributes of the yml file can be modified to migrate without modifying the business code.
This article mainly builds RocketMq and adds RabbitMq support in the later stage.

Source code of this component:
https://github.com/dwhgygzt/mq-spring-boot-starter

Preparation

First of all, please make sure that you have mastered how SpringBoot can customize a starter. If you don't understand, please check the blog:
https://blog.csdn.net/gzt19881123/article/details/106456362

Architecture conception

If you want to keep the business code unchanged, you must create your own set of message service interfaces and attribute objects to encapsulate the business logic of the message queue product.
Fully compare and analyze the same business logic of each message queue product, and extract the public interface.

RocketMq also has a semi message mechanism to consider.

The overall directory structure of the project is as follows:

Message object extraction

At present, only Topic messages are architected to create their own message objects.
Mainly refer to the source code of ApacheRocketMq

First, create the public interface of the message body class:

package com.middol.starter.mq.pojo;

import java.io.Serializable;

/**
 * Public abstract Message body Message
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public interface Message extends Serializable {

    /**
     * Get message body ID
     * @return String
     */
    String getMessageId();


    /**
     * Set message body ID
     *
     * @param messageId Message unique id
     */
    void setMessageId(String messageId);

    /**
     * Get message body
     *
     * @return byte[]
     */
    byte[] getMessageBody();

    /**
     * Set message body
     *
     * @param messageBody Message body byte[]
     */
    void setMessageBody(byte[] messageBody);


}

Then create the subject class message body object

package com.middol.starter.mq.pojo;

import java.util.Properties;

/**
 * Public Topic Message body
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public class TopicMessage implements Message {
    private static final long serialVersionUID = 1L;

    /**
     * User other attributes
     */
    private Properties userProperties;

    /**
     * <p>
     *   Message unique primary key
     * </p>
     *
     * <p>
     *   <strong>Generated by specific mq products </strong>
     * </p>
     */
    private String messageId;

    /**
     * <p>
     *   Message subject name, with a maximum length of 255 characters; By a-z, A-Z, 0-9, and Dash "-" and underline "\u" Composition
     * </p>
     *
     * <p>
     *   <strong>This member variable cannot be empty for a legal message </strong>
     * </p>
     */
    private String topicName;

    /**
     * <p>
     *   Message label, legal identifier, as short as possible and see the meaning of the name
     * </p>
     *
     * <p>
     *   It is recommended that this value be passed
     * </p>
     */
    private String tags;

    /**
     * <p>
     *   Business primary key, such as merchant order number
     * </p>
     *
     * <p>
     *   It is recommended that this value be passed
     * </p>
     */
    private String bussinessKey;

    /**
     * <p>
     *    The length of the message body is no more than 4M by default. Please refer to the cluster deployment document for details
     * </p>
     *
     * <p>
     *    <strong>This member variable cannot be empty for a legal message </strong>
     * </p>
     */
    private byte[] messageBody;

    /**
     * Default constructor; The necessary attributes are subsequently Set through the Set method
     */
    public TopicMessage() {
        this(null, null, "", null);
    }

    /**
     * Parameterized constructor
     * @param topicName Message subject
     * @param tags Message label
     * @param bussinessKey natural key 
     * @param messageBody Message body
     */
    public TopicMessage(String topicName, String tags, String bussinessKey, byte[] messageBody) {
        this.topicName = topicName;
        this.tags = tags;
        this.bussinessKey = bussinessKey;
        this.messageBody = messageBody;
    }

    /**
     * Parameterized constructor
     * @param messageId Unique primary key
     * @param topicName Message subject
     * @param tags Message label
     * @param bussinessKey natural key 
     * @param messageBody Message body
     */
    public TopicMessage(String messageId,String topicName, String tags, String bussinessKey, byte[] messageBody) {
        this.messageId = messageId;
        this.topicName = topicName;
        this.tags = tags;
        this.bussinessKey = bussinessKey;
        this.messageBody = messageBody;
    }

    @Override
    public String toString() {
        return "TopicMessage [topicName=" + topicName + ", tags=" + tags + ", messageBody=" + (messageBody != null ? messageBody.length : 0) + "]";
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    public String getTags() {
        return tags;
    }

    public void setTags(String tags) {
        this.tags = tags;
    }

    public String getBussinessKey() {
        return bussinessKey;
    }

    public void setBussinessKey(String bussinessKey) {
        this.bussinessKey = bussinessKey;
    }

    @Override
    public String getMessageId() {
        return messageId;
    }

    @Override
    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    @Override
    public byte[] getMessageBody() {
        return messageBody;
    }

    @Override
    public void setMessageBody(byte[] messageBody) {
        this.messageBody = messageBody;
    }

    public Properties getUserProperties() {
        return userProperties;
    }

    public void setUserProperties(Properties userProperties) {
        this.userProperties = userProperties;
    }

    /**
     * Add user-defined attribute key value pairs; This key value pair can be obtained during consumption
     * @param key Custom key
     * @param value Corresponding value
     */
    public void putUserProperties(final String key, final String value) {
        if (null == this.userProperties) {
            this.userProperties = new Properties();
        }

        if (key != null && value != null) {
            this.userProperties.put(key, value);
        }
    }

    /**
     * Get value of user defined key
     * @param key Custom key
     * @return User defined key value
     */
    public String getUserProperties(final String key) {
        if (null != this.userProperties) {
            return (String) this.userProperties.get(key);
        }

        return null;
    }
}

Of course, it also includes the status object of message processing:

package com.middol.starter.mq.pojo;

/**
 * Return result of consumption message
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public enum MessageStatus {
    /**
     * Successful consumption, continue to consume the next message
     */
    CommitMessage,
    /**
     * Consumption failed. Tell the server to post this message later and continue to consume other messages
     */
    ReconsumeLater,
}

Message subscription and publisher extraction

subscriber

package com.middol.starter.mq.service;

import com.middol.starter.mq.pojo.MessageStatus;
import com.middol.starter.mq.pojo.TopicMessage;

/**
 * MQ Corresponding listener to implement specific consumption business
 * subscribe mode
 * Consistent subscription relationship https://help.aliyun.com/document_detail/43523.html?spm=a2c4g.11186623.6.734.60b94c07Uwhsky
 * 1.The subscribed topics must be consistent
 * 2.Tag s in subscribed topics must be consistent
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public interface TopicListener {

    /**
     * Corresponding consumer, for example, groupId in aliyunlocketmq
     *
     * @return SubscriberBeanName
     */
    String getSubscriberBeanName();

    /**
     * Subscribed topic
     *
     * @return topic
     */
    String getTopicName();

    /**
     * Subscribed tag
     *
     * @return Subscription filter expression string. The ONS server filters based on this expression. Only or operations are supported <br>
     * eg: "tag1 || tag2 || tag3"<br>
     * If subExpression equals null or *, it means all subscriptions
     */
    String getTagExpression();

    /**
     * Message subscription
     *
     * @param topicMessage Subscription messages obtained from the message server
     * @return After executing the local business logic, feedback whether the message server has consumed MessageStatus
     */
    MessageStatus subscribe(TopicMessage topicMessage);

}

Publisher

package com.middol.starter.mq.service;

import com.middol.starter.mq.pojo.TopicMessage;
import com.middol.starter.mq.pojo.TopicMessageSendResult;

/**
 * Push message to MQ server
 * Publish mode
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public interface TopicPublisher extends Admin {

    /**
     * Synchronous push message
     *
     * @param topicMessage Message object
     * @return TopicMessageSendResult
     */
    TopicMessageSendResult publish(TopicMessage topicMessage);

    /**
     * Asynchronous push message
     *
     * @param topicMessage      Message object
     * @param topicSendCallback Asynchronous result processing
     */
    void publishAsync(TopicMessage topicMessage, TopicSendCallback topicSendCallback);

}


The specific implementation of the interface needs to be divided into Alibaba cloud and apache

Take an example to see the specific implementation. Please refer to the last GitHub address for the specific source code.

package com.middol.starter.mq.service.impl.aliyun;

import com.aliyun.openservices.ons.api.*;
import com.middol.starter.mq.exception.TopicMqException;
import com.middol.starter.mq.pojo.TopicMessage;
import com.middol.starter.mq.pojo.TopicMessageSendResult;
import com.middol.starter.mq.service.TopicPublisher;
import com.middol.starter.mq.service.TopicSendCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Alibaba cloud pushes messages to the MQ server
 * Publish mode
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public class AliyunSimpleRocketMqPublisher implements TopicPublisher {

    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * Alibaba cloud rocketmq producer
     */
    Producer producer;

    String beanName;

    public AliyunSimpleRocketMqPublisher(Producer producer,String beanName) {
        this.producer = producer;
        this.beanName = beanName;
    }

    public AliyunSimpleRocketMqPublisher() {
    }

    @Override
    public TopicMessageSendResult publish(TopicMessage topicMessage) {
        Message message = new Message();
        message.setUserProperties(topicMessage.getUserProperties());
        message.setKey(topicMessage.getBussinessKey());
        message.setBody(topicMessage.getMessageBody());
        message.setTag(topicMessage.getTags());
        message.setTopic(topicMessage.getTopicName());
        SendResult sendResult = producer.send(message);
        TopicMessageSendResult topicMessageSendResult = new TopicMessageSendResult();
        topicMessageSendResult.setMessageId(sendResult.getMessageId());
        topicMessageSendResult.setTopicName(sendResult.getTopic());
        return topicMessageSendResult;
    }

    @Override
    public void publishAsync(TopicMessage topicMessage, TopicSendCallback topicSendCallback) {
        Message message = new Message();
        message.setUserProperties(topicMessage.getUserProperties());
        message.setKey(topicMessage.getBussinessKey());
        message.setBody(topicMessage.getMessageBody());
        message.setTag(topicMessage.getTags());
        message.setTopic(topicMessage.getTopicName());
        producer.sendAsync(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                TopicMessageSendResult topicMessageSendResult = new TopicMessageSendResult();
                topicMessageSendResult.setTopicName(sendResult.getTopic());
                topicMessageSendResult.setMessageId(sendResult.getMessageId());
                topicSendCallback.onSuccess(topicMessageSendResult);
            }

            @Override
            public void onException(OnExceptionContext context) {
                TopicMqException topicMqException = new TopicMqException(context.getException());
                topicMqException.setTopicName(context.getTopic());
                topicMqException.setMessageId(context.getMessageId());
                topicSendCallback.onFail(topicMqException);
            }
        });
    }

    @Override
    public boolean isStarted() {
        return producer.isStarted();
    }

    @Override
    public boolean isClosed() {
        return producer.isClosed();
    }

    @Override
    public void start() {
        logger.info("[MQ]AliyunSimpleRocketMqPublisher["+beanName+"] start...");
        producer.start();
    }

    @Override
    public void close() {
        logger.info("[MQ]AliyunSimpleRocketMqPublisher[" + beanName + "] close...");
        producer.shutdown();
    }

    public String getBeanName() {
        return beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

}


package com.middol.starter.mq.service.impl.aliyun;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.middol.starter.mq.pojo.MessageStatus;
import com.middol.starter.mq.pojo.TopicMessage;
import com.middol.starter.mq.service.TopicListener;
import com.middol.starter.mq.service.TopicSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Alibaba cloud subscription message
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
public class AliyunSimpleRocketMqSubscriber implements TopicSubscriber {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * Alibaba cloud rocketMq consumer service
     */
    Consumer consumer;

    String beanName;

    public AliyunSimpleRocketMqSubscriber(Consumer consumer, String beanName) {
        this.consumer = consumer;
        this.beanName = beanName;
    }

    public AliyunSimpleRocketMqSubscriber() {
    }

    @Override
    public void subscribe(String topic, String tagExpression, TopicListener listener) {
        consumer.subscribe(topic, tagExpression, (message, context) -> {
            TopicMessage topicMessage = new TopicMessage();
            topicMessage.setUserProperties(message.getUserProperties());
            topicMessage.setBussinessKey(message.getKey());
            topicMessage.setMessageBody(message.getBody());
            topicMessage.setTags(message.getTag());
            topicMessage.setTopicName(message.getTopic());

            MessageStatus messageStatus = listener.subscribe(topicMessage);
            if (messageStatus.equals(MessageStatus.CommitMessage)) {
                return Action.CommitMessage;
            } else {
                return Action.ReconsumeLater;
            }
        });
    }

    @Override
    public void unsubscribe(String topicName) {
        consumer.unsubscribe(topicName);
    }

    @Override
    public boolean isStarted() {
        return consumer.isStarted();
    }

    @Override
    public boolean isClosed() {
        return consumer.isClosed();
    }

    @Override
    public void start() {
        logger.info("[MQ]AliyunSimpleRocketMqSubscriber[" + beanName + "] start...");
        consumer.start();
    }

    @Override
    public void close() {
        logger.info("[MQ]AliyunSimpleRocketMqSubscriber[" + beanName + "] close...");
        consumer.shutdown();
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public String getBeanName() {
        return beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }
}

The architecture idea here is very simple. In fact, it is enough to set a shell of its own interface on the basis of alicloud or Apahce rocketmq SDK.

How to use

When users use this message component in their own projects, they can configure it according to their dependent message queue products:
If you use Alibaba cloud's rocketmq, you can configure the following in the yml file:

mq:
    aliyun:
      rocketmq:
        enable: true
        access-key-id: xxxxxx
        access-key-secret: bbbbb
        name-server-addr: http://MQ_INST_1666017288766448_BcidZfUM.mq-internet-access.mq-internet.aliyuncs.com:80
        publishers:
          - {beanName: publishService1, groupId: GID_SAAS_DEV, sendMsgTimeoutMillis: 5000}
          - {beanName: xaPublishService1, groupId: GID_XA_SAAS_DEV, sendMsgTimeoutMillis: 5000, messageType: TRANSACTION, checkImmunityTimeInSeconds: 5}
        subscribers:
          - {beanName: subscriberService1, groupId: GID_SAAS_DEV}
          - {beanName: xaSubscriberService1, groupId: GID_XA_SAAS_DEV}

If you want to use Apache's RocketMq, modify the configuration as follows:

    apache:
      rocketmq:
        enable: true
        access-key-id: xxxxx
        access-key-secret: bbbb
        name-server-addr: localhost:9876
        publishers:
          - {beanName: publishService1, groupId: GID_SAAS_DEV, sendMsgTimeout: 5000}
          - {beanName: xaPublishService1, groupId: GID_XA_SAAS_DEV, sendMsgTimeout: 5000, messageType: TRANSACTION}
        subscribers:
          - {beanName: subscriberService1, groupId: GID_SAAS_DEV}
          - {beanName: xaSubscriberService1, groupId: GID_XA_SAAS_DEV}

In java code, messages are published as follows:

package com.middol.mytest.controller;

import cn.hutool.core.util.StrUtil;
import com.middol.mytest.config.mq.localtransactionexecuter.MyXaTopicLocalTransactionExecuter;
import com.middol.starter.mq.pojo.TopicMessage;
import com.middol.starter.mq.pojo.XaTopicMessage;
import com.middol.starter.mq.service.TopicPublisher;
import com.middol.starter.mq.service.XaTopicPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * Test mq
 *
 * @author <a href="mailto:guzhongtao@middol.com">guzhongtao</a>
 */
@RestController
@RequestMapping("/api/mq")
public class MqTestController {

    /**
     * Common type message sending server ResourceName is configured in the corresponding yml file
     */
    @Lazy
    @Resource(name = "publishService1")
    private TopicPublisher topicPublisher;

    /**
     * The transaction type message sending server ResourceName is configured in the corresponding yml file
     */
    @Lazy
    @Resource(name = "xaPublishService1")
    private XaTopicPublisher xaTopicPublisher;

    /**
     * Normal message sending test
     *
     * @param message Message body
     * @return Sent successfully
     */
    @PostMapping("singlePush")
    public String singlePush(String message) {
        if (StrUtil.isEmpty(message)) {
            return "Message body cannot be empty";
        }

        TopicMessage msg = new TopicMessage();
        msg.setTopicName("SAAS-PT");
        msg.setTags("TAG1");
        msg.setBussinessKey(System.currentTimeMillis() + "");
        msg.setMessageBody(message.getBytes(StandardCharsets.UTF_8));
        topicPublisher.publish(msg);

        return "Sent successfully";
    }

    /**
     * Semi message transaction type message sending
     *
     * @param message  Message body
     * @param isCommit Whether to submit the test true submit false do not submit, rollback
     * @return Sent successfully
     */
    @PostMapping("xaPush")
    public String xaPush(String message, Boolean isCommit) {
        if (StrUtil.isEmpty(message)) {
            return "Message body cannot be empty";
        }

        XaTopicMessage msg = new XaTopicMessage();
        msg.setLocalTransactionExecuterId(MyXaTopicLocalTransactionExecuter.EXECUTER_ID);
        msg.setTopicName("XA-SAAS");
        msg.setTags("TAG1");
        msg.setBussinessKey(System.currentTimeMillis() + "");
        msg.setMessageBody(message.getBytes(StandardCharsets.UTF_8));
        xaTopicPublisher.publishInTransaction(msg, isCommit);

        return "Sent successfully";
    }

}

The message subscription is as follows:

package com.middol.mytest.config.mq.listener;

import com.middol.starter.mq.pojo.MessageStatus;
import com.middol.starter.mq.pojo.TopicMessage;
import com.middol.starter.mq.service.TopicListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * message listener 
 *
 * @author guzt
 */
@Component
public class MyMessageListenerService implements TopicListener {
    @Override
    public String getSubscriberBeanName() {
        return "subscriberService1";
    }

    @Override
    public String getTopicName() {
        return "SAAS-PT";
    }

    @Override
    public String getTagExpression() {
        return "*";
    }

    @Override
    public MessageStatus subscribe(TopicMessage topicMessage) {

        System.out.println("Consumption message message body = " + new String(topicMessage.getMessageBody(), StandardCharsets.UTF_8));

        return MessageStatus.CommitMessage;
    }
}

The above is just a brief description of the general idea. Only a little of the code is posted. Please move to github for specific source code. Let's talk about it together:

https://github.com/dwhgygzt/mq-spring-boot-starter

Tags: Java Spring Boot RocketMQ

Posted by edevil on Wed, 01 Jun 2022 09:25:18 +0530