Use the transaction provided by Spring to listen to the ApplicationEvent completion event

When developing a project some time ago, I encountered a database transaction that has not been submitted, but the sending MQ has been consumed by consumers, resulting in the problem of data synchronization.

 The specifics are like this. Generally, we will deal with database operations and other service processing in the @Service class. Generally, we will add @Transactional annotations to methods involving database additions, deletions, and changes, indicating that this method is hosted by spring to process transactions. . When the entire process is executed without exception, the transaction will be submitted. At this time, I add MQ sending before the end of the method.

this.amqpTemplate.convertAndSend(AmqpExchange.XXX_CHANGE, AmqpExchange.XXX_CHANGE + "_ROUTING", message);

At this time, the problem arises. When you send the data id, the consumer needs to use this id to query the database. However, although our code has been executed, the transaction has not been submitted. The data in the database is not There is no change, so the data obtained by the consumer at this time is the data before we have operated, and there will be problems at this time.

Then the way to solve this problem is very simple, that is to use the transaction monitoring provided by Spring:

First, you need to define a transaction listener event, which inherits from ApplicationEvent:

import org.springframework.context.ApplicationEvent;

/**
 * transaction listener event
 * @author hexm
 * @date 2020/6/9 14:15
 */
public class AlterTransactionalEvent extends ApplicationEvent {

    public AlterTransactionalEvent(Apply source) {
        super(source);
    }


    @Override
    public Apply getSource() {
        return (Apply) super.getSource();
    }

    //Note that here we have added a method interface without formal parameters and return values, so that we can use lambda expressions directly in the future.
    @FunctionalInterface
    public interface Apply {

        /**
         * implement
         */
        void apply();
    }
}

Then we go on to create a listener class to listen for this event:

import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

/**
 * transaction monitoring
 *
 * @author hexm
 * @date 2020/6/9 14:18
 */
@Component
public class AlterTransactionListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onHandler(AlterTransactionalEvent event) {
        AlterTransactionalEvent.Apply apply = event.getSource();
        if (apply != null) {
            // Because we pass in a lambda expression, we can call the execution process as if we were using an anonymous function
            apply.apply();
        }
    }
}
phase = TransactionPhase.AFTER_COMMIT indicates that after the event is triggered, the submission has been successfully completed. This is also the default item, and it is okay to not add it.
In addition, there are BEFORE_COMMIT before submission, AFTER_ROLLBACK rollback, and AFTER_COMPLETION whether it is successfully submitted or not.

After defining the listener, you can use it happily:

@Service
public class Test{

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Transactional(rollbackFor = Exception.class)
    public void deleteById(Long id){
        // perform database operations
        ...
        //Send transaction monitoring events, take full advantage of jdk8's lambda expressions, and send processing tasks together
        applicationEventPublisher.publishEvent(new AlterTransactionalEvent(() -> {
            // Trigger transaction completion event, execute send MQ
            Message msg = new Message (id); //This Message is custom and can actually be of any type
            this.amqpTemplate.convertAndSend(AmqpExchange.XXX_CHANGE, AmqpExchange.XXX_CHANGE + "_ROUTING", msg);
        }));
    }
}

This completes the process of event monitoring. It should be noted that the ApplicationEventPublisher class is injected here to push events.

Tags: Java Spring

Posted by ShaileshD on Wed, 01 Jun 2022 04:06:52 +0530