When developing a project some time ago, I encountered a database transaction that has not been submitted, but the sending MQ has been consumed by consumers, resulting in the problem of data synchronization.
The specifics are like this. Generally, we will deal with database operations and other service processing in the @Service class. Generally, we will add @Transactional annotations to methods involving database additions, deletions, and changes, indicating that this method is hosted by spring to process transactions. . When the entire process is executed without exception, the transaction will be submitted. At this time, I add MQ sending before the end of the method.
this.amqpTemplate.convertAndSend(AmqpExchange.XXX_CHANGE, AmqpExchange.XXX_CHANGE + "_ROUTING", message);
At this time, the problem arises. When you send the data id, the consumer needs to use this id to query the database. However, although our code has been executed, the transaction has not been submitted. The data in the database is not There is no change, so the data obtained by the consumer at this time is the data before we have operated, and there will be problems at this time.
Then the way to solve this problem is very simple, that is to use the transaction monitoring provided by Spring:
First, you need to define a transaction listener event, which inherits from ApplicationEvent:
import org.springframework.context.ApplicationEvent; /** * transaction listener event * @author hexm * @date 2020/6/9 14:15 */ public class AlterTransactionalEvent extends ApplicationEvent { public AlterTransactionalEvent(Apply source) { super(source); } @Override public Apply getSource() { return (Apply) super.getSource(); } //Note that here we have added a method interface without formal parameters and return values, so that we can use lambda expressions directly in the future. @FunctionalInterface public interface Apply { /** * implement */ void apply(); } }
Then we go on to create a listener class to listen for this event:
import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; /** * transaction monitoring * * @author hexm * @date 2020/6/9 14:18 */ @Component public class AlterTransactionListener { @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onHandler(AlterTransactionalEvent event) { AlterTransactionalEvent.Apply apply = event.getSource(); if (apply != null) { // Because we pass in a lambda expression, we can call the execution process as if we were using an anonymous function apply.apply(); } } }
phase = TransactionPhase.AFTER_COMMIT indicates that after the event is triggered, the submission has been successfully completed. This is also the default item, and it is okay to not add it. In addition, there are BEFORE_COMMIT before submission, AFTER_ROLLBACK rollback, and AFTER_COMPLETION whether it is successfully submitted or not.
After defining the listener, you can use it happily:
@Service public class Test{ @Autowired private AmqpTemplate amqpTemplate; @Autowired private ApplicationEventPublisher applicationEventPublisher; @Transactional(rollbackFor = Exception.class) public void deleteById(Long id){ // perform database operations ... //Send transaction monitoring events, take full advantage of jdk8's lambda expressions, and send processing tasks together applicationEventPublisher.publishEvent(new AlterTransactionalEvent(() -> { // Trigger transaction completion event, execute send MQ Message msg = new Message (id); //This Message is custom and can actually be of any type this.amqpTemplate.convertAndSend(AmqpExchange.XXX_CHANGE, AmqpExchange.XXX_CHANGE + "_ROUTING", msg); })); } }
This completes the process of event monitoring. It should be noted that the ApplicationEventPublisher class is injected here to push events.