首先,到底啥是分布式事務呢,比如我們在執行一個業務邏輯的時候有兩步分別操作A數據源和B數據源,當我們在A數據源執行數據更改后,在B數據源執行時出現運行時異常,那么我們必須要讓B數據源的操作回滾,并回滾對A數據源的操作;這種情況在支付業務時常常出現;比如買票業務在最后支付失敗,那之前的操作必須全部回滾,如果之前的操作分布在多個數據源中,那么這就是典型的分布式事務回滾;
了解了什么是分布式事務,那分布式事務在java的解決方案就是JTA(即Java Transaction API);springboot官方提供了 Atomikos or Bitronix的解決思路;
其實,大多數情況下很多公司是使用消息隊列的方式實現分布式事務。
本篇文章重點講解springboot環境下,整合 Atomikos +mysql+mybatis+tomcat/jetty;
一、項目依賴
pom.xml中添加atomikos的springboot相關依賴:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-jta-atomikosartifactId>
dependency>
點進去會發現里面整合好了:transactions-jms
、transactions-jta
、transactions-jdbc
、javax.transaction-api
二、把數據源的相關配置項單獨提煉到一個application.yml中:
注意:
-
這回我們的
spring.datasource.type
是com.alibaba.druid.pool.xa.DruidXADataSource;
-
spring.jta.transaction-manager-id
的值在你的電腦中是唯一的,這個詳細請閱讀官方文檔;
完整的yml文件如下:
spring:
datasource:
type:com.alibaba.druid.pool.xa.DruidXADataSource
druid:
systemDB:
name:systemDB
url:jdbc//localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8
username:root
password:root
#下面為連接池的補充設置,應用到上面所有數據源中
#初始化大小,最小,最大
initialSize:5
minIdle:5
maxActive:20
#配置獲取連接等待超時的時間
maxWait:60000
#配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
timeBetweenEvictionRunsMillis:60000
#配置一個連接在池中最小生存的時間,單位是毫秒
minEvictableIdleTimeMillis:30
validationQuery:SELECT1
validationQueryTimeout:10000
testWhileIdle:true
testOnBorrow:false
testOnReturn:false
#打開PSCache,并且指定每個連接上PSCache的大小
poolPreparedStatements:true
maxPoolPreparedStatementPerConnectionSize:20
filters:stat,wall
#通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
connectionProperties:druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合并多個DruidDataSource的監控數據
useGlobalDataSourceStat:true
businessDB:
name:businessDB
url:jdbc//localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8
username:root
password:root
#下面為連接池的補充設置,應用到上面所有數據源中
#初始化大小,最小,最大
initialSize:5
minIdle:5
maxActive:20
#配置獲取連接等待超時的時間
maxWait:60000
#配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
timeBetweenEvictionRunsMillis:60000
#配置一個連接在池中最小生存的時間,單位是毫秒
minEvictableIdleTimeMillis:30
validationQuery:SELECT1
validationQueryTimeout:10000
testWhileIdle:true
testOnBorrow:false
testOnReturn:false
#打開PSCache,并且指定每個連接上PSCache的大小
poolPreparedStatements:true
maxPoolPreparedStatementPerConnectionSize:20
filters:stat,wall
#通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
connectionProperties:druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合并多個DruidDataSource的監控數據
useGlobalDataSourceStat:true
#jta相關參數配置
jta:
log-dir:classpath:tx-logs
transaction-manager-id:txManager
三、在DruidConfig.java中實現多個數據源的注冊;分布式事務管理器的注冊;druid的注冊;
packagecom.zjt.config;
importcom.alibaba.druid.filter.stat.StatFilter;
importcom.alibaba.druid.support.http.StatViewServlet;
importcom.alibaba.druid.support.http.WebStatFilter;
importcom.alibaba.druid.wall.WallConfig;
importcom.alibaba.druid.wall.WallFilter;
importcom.atomikos.icatch.jta.UserTransactionImp;
importcom.atomikos.icatch.jta.UserTransactionManager;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
importorg.springframework.boot.web.servlet.FilterRegistrationBean;
importorg.springframework.boot.web.servlet.ServletRegistrationBean;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.context.annotation.Primary;
importorg.springframework.core.env.Environment;
importorg.springframework.transaction.jta.JtaTransactionManager;
importjavax.sql.DataSource;
importjavax.transaction.UserTransaction;
importjava.util.Properties;
/**
*Druid配置
*
*
*/
@Configuration
publicclassDruidConfig{
@Bean(name="systemDataSource")
@Primary
@Autowired
publicDataSourcesystemDataSource(Environmentenv){
AtomikosDataSourceBeands=newAtomikosDataSourceBean();
Propertiesprop=build(env,"spring.datasource.druid.systemDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("systemDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
returnds;
}
@Autowired
@Bean(name="businessDataSource")
publicAtomikosDataSourceBeanbusinessDataSource(Environmentenv){
AtomikosDataSourceBeands=newAtomikosDataSourceBean();
Propertiesprop=build(env,"spring.datasource.druid.businessDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("businessDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
returnds;
}
/**
*注入事物管理器
*@return
*/
@Bean(name="xatx")
publicJtaTransactionManagerregTransactionManager(){
UserTransactionManageruserTransactionManager=newUserTransactionManager();
UserTransactionuserTransaction=newUserTransactionImp();
returnnewJtaTransactionManager(userTransaction,userTransactionManager);
}
privatePropertiesbuild(Environmentenv,Stringprefix){
Propertiesprop=newProperties();
prop.put("url",env.getProperty(prefix+"url"));
prop.put("username",env.getProperty(prefix+"username"));
prop.put("password",env.getProperty(prefix+"password"));
prop.put("driverClassName",env.getProperty(prefix+"driverClassName",""));
prop.put("initialSize",env.getProperty(prefix+"initialSize",Integer.class));
prop.put("maxActive",env.getProperty(prefix+"maxActive",Integer.class));
prop.put("minIdle",env.getProperty(prefix+"minIdle",Integer.class));
prop.put("maxWait",env.getProperty(prefix+"maxWait",Integer.class));
prop.put("poolPreparedStatements",env.getProperty(prefix+"poolPreparedStatements",Boolean.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix+"maxPoolPreparedStatementPerConnectionSize",Integer.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix+"maxPoolPreparedStatementPerConnectionSize",Integer.class));
prop.put("validationQuery",env.getProperty(prefix+"validationQuery"));
prop.put("validationQueryTimeout",env.getProperty(prefix+"validationQueryTimeout",Integer.class));
prop.put("testOnBorrow",env.getProperty(prefix+"testOnBorrow",Boolean.class));
prop.put("testOnReturn",env.getProperty(prefix+"testOnReturn",Boolean.class));
prop.put("testWhileIdle",env.getProperty(prefix+"testWhileIdle",Boolean.class));
prop.put("timeBetweenEvictionRunsMillis",
env.getProperty(prefix+"timeBetweenEvictionRunsMillis",Integer.class));
prop.put("minEvictableIdleTimeMillis",env.getProperty(prefix+"minEvictableIdleTimeMillis",Integer.class));
prop.put("filters",env.getProperty(prefix+"filters"));
returnprop;
}
@Bean
publicServletRegistrationBeandruidServlet(){
ServletRegistrationBeanservletRegistrationBean=newServletRegistrationBean(newStatViewServlet(),"/druid/*");
//控制臺管理用戶,加入下面2行進入druid后臺就需要登錄
//servletRegistrationBean.addInitParameter("loginUsername","admin");
//servletRegistrationBean.addInitParameter("loginPassword","admin");
returnservletRegistrationBean;
}
@Bean
publicFilterRegistrationBeanfilterRegistrationBean(){
FilterRegistrationBeanfilterRegistrationBean=newFilterRegistrationBean();
filterRegistrationBean.setFilter(newWebStatFilter());
filterRegistrationBean.addUrlPatterns("/*");
filterRegistrationBean.addInitParameter("exclusions","*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
filterRegistrationBean.addInitParameter("profileEnable","true");
returnfilterRegistrationBean;
}
@Bean
publicStatFilterstatFilter(){
StatFilterstatFilter=newStatFilter();
statFilter.setLogSlowSql(true);//slowSqlMillis用來配置SQL慢的標準,執行時間超過slowSqlMillis的就是慢。
statFilter.setMergeSql(true);//SQL合并配置
statFilter.setSlowSqlMillis(1000);//slowSqlMillis的缺省值為3000,也就是3秒。
returnstatFilter;
}
@Bean
publicWallFilterwallFilter(){
WallFilterwallFilter=newWallFilter();
//允許執行多條SQL
WallConfigconfig=newWallConfig();
config.setMultiStatementAllow(true);
wallFilter.setConfig(config);
returnwallFilter;
}
}
四、分別配置每個數據源對應的sqlSessionFactory,以及MapperScan掃描的包:
MybatisDatasourceConfig.java
packagecom.zjt.config;
importcom.zjt.util.MyMapper;
importorg.apache.ibatis.session.SqlSessionFactory;
importorg.mybatis.spring.SqlSessionFactoryBean;
importorg.mybatis.spring.SqlSessionTemplate;
importorg.mybatis.spring.annotation.MapperScan;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.core.io.support.PathMatchingResourcePatternResolver;
importorg.springframework.core.io.support.ResourcePatternResolver;
importjavax.sql.DataSource;
/**
*
*@description
*/
@Configuration
//精確到mapper目錄,以便跟其他數據源隔離
@MapperScan(basePackages="com.zjt.mapper",markerInterface=MyMapper.class,sqlSessionFactoryRef="sqlSessionFactory")
publicclassMybatisDatasourceConfig{
@Autowired
@Qualifier("systemDataSource")
privateDataSourceds;
@Bean
publicSqlSessionFactorysqlSessionFactory()throwsException{
SqlSessionFactoryBeanfactoryBean=newSqlSessionFactoryBean();
factoryBean.setDataSource(ds);
//指定mapperxml目錄
ResourcePatternResolverresolver=newPathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
returnfactoryBean.getObject();
}
@Bean
publicSqlSessionTemplatesqlSessionTemplate()throwsException{
SqlSessionTemplatetemplate=newSqlSessionTemplate(sqlSessionFactory());//使用上面配置的Factory
returntemplate;
}
//關于事務管理器,不管是JPA還是JDBC等都實現自接口PlatformTransactionManager
//如果你添加的是 spring-boot-starter-jdbc 依賴,框架會默認注入 DataSourceTransactionManager 實例。
//在Spring容器中,我們手工注解@Bean 將被優先加載,框架不會重新實例化其他的 PlatformTransactionManager 實現類。
/*@Bean(name="transactionManager")
@Primary
publicDataSourceTransactionManagermasterTransactionManager(){
//MyBatis自動參與到spring事務管理中,無需額外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的數據源
//與DataSourceTransactionManager引用的數據源一致即可,否則事務管理會不起作用。
returnnewDataSourceTransactionManager(ds);
}*/
}
MybatisDatasource2Config.java
packagecom.zjt.config;
importcom.zjt.util.MyMapper;
importorg.apache.ibatis.session.SqlSessionFactory;
importorg.mybatis.spring.SqlSessionFactoryBean;
importorg.mybatis.spring.SqlSessionTemplate;
importorg.mybatis.spring.annotation.MapperScan;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.core.io.support.PathMatchingResourcePatternResolver;
importorg.springframework.core.io.support.ResourcePatternResolver;
importjavax.sql.DataSource;
/**
*
*@description
*/
@Configuration
//精確到mapper目錄,以便跟其他數據源隔離
@MapperScan(basePackages="com.zjt.mapper2",markerInterface=MyMapper.class,sqlSessionFactoryRef="sqlSessionFactory2")
publicclassMybatisDatasource2Config{
@Autowired
@Qualifier("businessDataSource")
privateDataSourceds;
@Bean
publicSqlSessionFactorysqlSessionFactory2()throwsException{
SqlSessionFactoryBeanfactoryBean=newSqlSessionFactoryBean();
factoryBean.setDataSource(ds);
//指定mapperxml目錄
ResourcePatternResolverresolver=newPathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
returnfactoryBean.getObject();
}
@Bean
publicSqlSessionTemplatesqlSessionTemplate2()throwsException{
SqlSessionTemplatetemplate=newSqlSessionTemplate(sqlSessionFactory2());//使用上面配置的Factory
returntemplate;
}
//關于事務管理器,不管是JPA還是JDBC等都實現自接口PlatformTransactionManager
//如果你添加的是 spring-boot-starter-jdbc 依賴,框架會默認注入 DataSourceTransactionManager 實例。
//在Spring容器中,我們手工注解@Bean 將被優先加載,框架不會重新實例化其他的 PlatformTransactionManager 實現類。
/*@Bean(name="transactionManager2")
@Primary
publicDataSourceTransactionManagermasterTransactionManager(){
//MyBatis自動參與到spring事務管理中,無需額外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的數據源
//與DataSourceTransactionManager引用的數據源一致即可,否則事務管理會不起作用。
returnnewDataSourceTransactionManager(ds);
}*/
}
由于我們本例中只使用一個事務管理器:xatx,故就不在使用TxAdviceInterceptor.java
和TxAdvice2Interceptor.java
中配置的事務管理器了;有需求的童鞋可以自己配置其他的事務管理器;(見DruidConfig.java中查看)
五、新建分布式業務測試接口JtaTestService.java和實現類JtaTestServiceImpl.java
其實就是一個很簡單的test01()方法,在該方法中我們分別先后調用classService.saveOrUpdateTClass(tClass);
和teacherService.saveOrUpdateTeacher(teacher);
實現先后操作兩個數據源:然后我們可以自己debug跟蹤事務的提交時機,此外,也可以在在兩個方法全執行結束之后,手動制造一個運行時異常,來檢查分布式事務是否全部回滾;
注意:
在實現類的方法中我使用的是:
@Transactional(transactionManager="xatx",propagation=Propagation.REQUIRED,rollbackFor={java.lang.RuntimeException.class})
從而指定了使用哪個事務管理器,事務隔離級別(一般都用我這個默認的),回滾的條件(一般可以使用Exception),這三個可以自己根據業務實際修改;
packagecom.zjt.service3;
importjava.util.Map;
publicinterfaceJtaTestService{
publicMaptest01() ;
}
packagecom.zjt.service3.impl;
importcom.zjt.entity.TClass;
importcom.zjt.entity.Teacher;
importcom.zjt.service.TClassService;
importcom.zjt.service2.TeacherService;
importcom.zjt.service3.JtaTestService;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.stereotype.Service;
importorg.springframework.transaction.annotation.Propagation;
importorg.springframework.transaction.annotation.Transactional;
importjava.util.LinkedHashMap;
importjava.util.Map;
@Service("jtaTestServiceImpl")
publicclassJtaTestServiceImplimplementsJtaTestService{
@Autowired
@Qualifier("teacherServiceImpl")
privateTeacherServiceteacherService;
@Autowired
@Qualifier("tclassServiceImpl")
privateTClassServicetclassService;
@Override
@Transactional(transactionManager="xatx",propagation=Propagation.REQUIRED,rollbackFor={java.lang.RuntimeException.class})
publicMap<String,Object>test01(){
LinkedHashMapresultMap=newLinkedHashMap();
TClasstClass=newTClass();
tClass.setName("8888");
tclassService.saveOrUpdateTClass(tClass);
Teacherteacher=newTeacher();
teacher.setName("8888");
teacherService.saveOrUpdateTeacher(teacher);
System.out.println(1/0);
resultMap.put("state","success");
resultMap.put("message","分布式事務同步成功");
returnresultMap;
}
}
六、建立JtaTestContoller.java,接受一個來自前端的http請求,觸發JtaTestService 的test01方法:
packagecom.zjt.web;
importcom.zjt.service3.JtaTestService;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.stereotype.Controller;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.ResponseBody;
importjava.util.LinkedHashMap;
importjava.util.Map;
@Controller
@RequestMapping("/jtaTest")
publicclassJtaTestContoller{
@Autowired
@Qualifier("jtaTestServiceImpl")
privateJtaTestServicetaTestService;
@ResponseBody
@RequestMapping("/test01")
publicMaptest01() {
LinkedHashMapresultMap=newLinkedHashMap();
try{
returntaTestService.test01();
}catch(Exceptione){
resultMap.put("state","fail");
resultMap.put("message","分布式事務同步失敗");
returnresultMap;
}
}
}
七、在test.ftl中增加一個按鈕來測試;
//分布式事務測試
$("#JTATest").click(function(){
$.ajax({
type: "POST",
url: "${basePath!}/jtaTest/test01",
data: {} ,
async: false,
error: function (request) {
layer.alert("與服務器連接失敗/(ㄒoㄒ)/~~");
return false;
},
success: function (data) {
if (data.state == 'fail') {
layer.alert(data.message);
return false;
}else if(data.state == 'success'){
layer.alert(data.message);
}
}
});
});
八、啟動服務,驗證結果:
點擊這個按鈕,跳轉到controller:
當正常執行了sql語句之后,我們可以發現數據庫并沒有變化,因為整個方法的事務還沒有走完,當我們走到1/0這步時:
拋出運行時異常,并被spring事務攔截器攔截,并捕獲異常:
在this.completeTransactionAfterThrowing(txInfo, var16);
方法中會將事務全部回滾:
2204.243logback[http-nio-8080-exec-5]INFOc.a.i.imp.CompositeTransactionImp-rollback()doneoftransaction192.168.1.103.tm0000400006
此時,當我們再次打開數據庫驗證,依舊沒有變化,證明分布式事務配置成功;
大家可以基于我的代碼自己練習一下,自己嘗試著使用多事務管理器的情況下的靈活配置;
九、后記:
本文源代碼:
https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git
代碼在tomcat和jetty環境下均可完成事務回滾;
在事務回滾時可能報一個Transactional not active
的警告,我google后,老外也說不出這個具體作用,大部分人認為這只是一個警告,可以忽略;
-End-
-
API
+關注
關注
2文章
1502瀏覽量
62123 -
分布式
+關注
關注
1文章
903瀏覽量
74547 -
數據源
+關注
關注
1文章
63瀏覽量
9694 -
SpringBoot
+關注
關注
0文章
173瀏覽量
183
原文標題:SpringBoot 分布式事務的解決方案(JTA+Atomic+多數據源)
文章出處:【微信號:AndroidPush,微信公眾號:Android編程精選】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論