background
The framework has completed the dynamic switching of multiple data sources and transaction processing before. It wants to provide a simple cross database transaction processing function. After online search and research, there are generally XA transaction /SEGA transaction /TCC transaction and other schemes. Because the business mainly involves the government and enterprises and the concurrency is small, XA transaction is adopted. Although the performance is lost, it can ensure strong data consistency
conceptual design
A copy of the registered data source is used for XA transactions, so that local transactions and XA global transactions can be used independently
Maven configuration
Introducing atomikos third-party components
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
Register XA data source
To use the Druid connection pool, you need to use the DruidXADataSource data source object and wrap it with AtomikosDataSourceBean
When registering a data source, register two copies of the same connection. One is a normal data source, and the other is a data source for XA transactions. The data source ID is distinguished and associated
After spring registers the XA transaction manager by default, all transaction operations will not go through local transactions. We decide whether to go through local transactions or XA transactions by switching different data sources
//Master data source xa mode @Bean @Qualifier("masterXADataSource") public DataSource masterXADataSource() { DruidXADataSource datasource = new DruidXADataSource(); if(driverClassName.equals("com.mysql.cj.jdbc.Driver")){ if(!dbUrl.contains("useOldAliasMetadataBehavior")){ dbUrl += "&useOldAliasMetadataBehavior=true"; } if(!dbUrl.contains("useAffectedRows")){ dbUrl += "&useAffectedRows=true"; } } datasource.setUrl(this.dbUrl); datasource.setUsername(username); datasource.setPassword(password); datasource.setDriverClassName(driverClassName); //configuration datasource.setInitialSize(1); datasource.setMinIdle(3); datasource.setMaxActive(20); datasource.setMaxWait(60000); datasource.setTimeBetweenEvictionRunsMillis(60000); datasource.setMinEvictableIdleTimeMillis(60000); datasource.setValidationQuery("select 'x'"); datasource.setTestWhileIdle(true); datasource.setTestOnBorrow(false); datasource.setTestOnReturn(false); datasource.setPoolPreparedStatements(true); datasource.setMaxPoolPreparedStatementPerConnectionSize(20); datasource.setLogAbandoned(false); //Whether to record the log when removing the leaked connection try { datasource.setFilters("stat,slf4j"); } catch (SQLException e) { logger.error("druid configuration initialization filter", e); } datasource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");//connectionProperties); AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); atomikosDataSourceBean.setUniqueResourceName("master-xa"); atomikosDataSourceBean.setXaDataSource(datasource); atomikosDataSourceBean.setPoolSize(5); atomikosDataSourceBean.setMaxPoolSize(20); atomikosDataSourceBean.setTestQuery("select 1"); return atomikosDataSourceBean; }
Register XA transaction manager
Use the spring built-in jtatatransactionmanager transaction manager object and set AllowCustomIsolationLevels to true. Otherwise, specifying the custom transaction isolation level will result in an error
//xa mode global transaction manager @Bean(name = "jtaTransactionManager") public PlatformTransactionManager transactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); UserTransaction userTransaction = new UserTransactionImp(); JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, userTransactionManager); jtaTransactionManager.setAllowCustomIsolationLevels(true); return jtaTransactionManager; }
Define XA transaction facets
Customize the annotation @GlobalTransactional and define the corresponding facet. When using the specified annotation, it is identified in the ThreadLocal variable value and combined
@The Transactional annotation specifies the XA transaction manager to determine whether the data source is currently in the XA transaction when switching data sources, so as to switch different data sources
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented @Transactional(rollbackFor = Exception.class,isolation = Isolation.READ_UNCOMMITTED,transactionManager = "jtaTransactionManager") public @interface GlobalTransactional { }
@Aspect @Component @Order(value = 99) public class GlobalTransitionAspect { private static Logger logger = LoggerFactory.getLogger(GlobalTransitionAspect.class); @Autowired private DynamicDataSource dynamicDataSource; /** * Tangent point assignment annotation */ @Pointcut("@annotation(com.code2roc.fastkernel.datasource.GlobalTransactional) " + "|| @within(com.code2roc.fastkernel.datasource.GlobalTransactional)") public void dataSourcePointCut() { } /** * The interception method is specified as dataSourcePointCut */ @Around("dataSourcePointCut()") public Object around(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); GlobalTransactional methodAnnotation = method.getAnnotation(GlobalTransactional.class); if (methodAnnotation != null) { DataSourceContextHolder.tagGlobal(); logger.info("Mark global transactions"); } try { return point.proceed(); } finally { logger.info("Clear global transactions"); DataSourceContextHolder.clearGlobal(); } } }
public class DataSourceContextHolder { private static Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class); // Operation on the current thread - thread safe private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>(); private static final ThreadLocal<String> contextGlobalHolder = new ThreadLocal<String>(); // Call this method to switch the data source public static void setDataSource(String dataSource) { contextHolder.set(dataSource); log.debug("Switched to data source:{}", dataSource); } // Get data source public static String getDataSource() { String value = contextHolder.get(); if (StringUtil.isEmpty(value)) { value = "master"; } if (!StringUtil.isEmpty(getGlobal())) { value = value + "-xa"; } return value; } // remove data sources public static void clearDataSource() { contextHolder.remove(); log.debug("Switched to primary data source"); } //====================Global transaction flags================ public static void tagGlobal() { contextGlobalHolder.set("1"); } public static String getGlobal() { String value = contextGlobalHolder.get(); return value; } public static void clearGlobal() { contextGlobalHolder.remove(); } //=================================================== }
Configuring XA transaction logs
By creating transactions The properties file can specify the storage path of the XA transaction log
com.atomikos.icatch.log_base_dir= tempfiles/transition/