背景介紹
1,最近有一個大數據量插入的操作入庫的業務場景,需要先做一些其他修改操作,然后在執行插入操作,由于插入數據可能會很多,用到多線程去拆分數據并行處理來提高響應時間,如果有一個線程執行失敗,則全部回滾。
2,在spring中可以使用@Transactional注解去控制事務,使出現異常時會進行回滾,在多線程中,這個注解則不會生效,如果主線程需要先執行一些修改數據庫的操作,當子線程在進行處理出現異常時,主線程修改的數據則不會回滾,導致數據錯誤。
3,下面用一個簡單示例演示多線程事務。
公用的類和方法
/** ?*?平均拆分list方法. ?*?@param?source ?*?@param?n ?*?@param??*?@return ?*/ public?static? ?List >?averageAssign(List
?source,int?n){ ????List >?result=new?ArrayList
>(); ????int?remaider=source.size()%n;? ????int?number=source.size()/n;? ????int?offset=0;//偏移量 ????for(int?i=0;i
?value=null; ????????if(remaider>0){ ????????????value=source.subList(i*number+offset,?(i+1)*number+offset+1); ????????????remaider--; ????????????offset++; ????????}else{ ????????????value=source.subList(i*number+offset,?(i+1)*number+offset); ????????} ????????result.add(value); ????} ????return?result; } /**??線程池配置 ?*?@version?V1.0 ?*/ public?class?ExecutorConfig?{ ????private?static?int?maxPoolSize?=?Runtime.getRuntime().availableProcessors(); ????private?volatile?static?ExecutorService?executorService; ????public?static?ExecutorService?getThreadPool()?{ ????????if?(executorService?==?null){ ????????????synchronized?(ExecutorConfig.class){ ????????????????if?(executorService?==?null){ ????????????????????executorService?=??newThreadPool(); ????????????????} ????????????} ????????} ????????return?executorService; ????} ????private?static??ExecutorService?newThreadPool(){ ????????int?queueSize?=?500; ????????int?corePool?=?Math.min(5,?maxPoolSize); ????????return?new?ThreadPoolExecutor(corePool,?maxPoolSize,?10000L,?TimeUnit.MILLISECONDS, ????????????new?LinkedBlockingQueue<>(queueSize),new?ThreadPoolExecutor.AbortPolicy()); ????} ????private?ExecutorConfig(){} } /**?獲取sqlSession ?*?@author?86182 ?*?@version?V1.0 ?*/ @Component public?class?SqlContext?{ ????@Resource ????private?SqlSessionTemplate?sqlSessionTemplate; ????public?SqlSession?getSqlSession(){ ????????SqlSessionFactory?sqlSessionFactory?=?sqlSessionTemplate.getSqlSessionFactory(); ????????return?sqlSessionFactory.openSession(); ????} }
示例事務不成功操作
??/** ?*?測試多線程事務. ?*?@param?employeeDOList ?*/ @Override @Transactional public?void?saveThread(List?employeeDOList)?{ ????try?{ ????????//先做刪除操作,如果子線程出現異常,此操作不會回滾 ????????this.getBaseMapper().delete(null); ????????//獲取線程池 ????????ExecutorService?service?=?ExecutorConfig.getThreadPool(); ????????//拆分數據,拆分5份 ????????List >?lists=averageAssign(employeeDOList,?5); ????????//執行的線程 ????????Thread?[]threadArray?=?new?Thread[lists.size()]; ????????//監控子線程執行完畢,再執行主線程,要不然會導致主線程關閉,子線程也會隨著關閉 ????????CountDownLatch?countDownLatch?=?new?CountDownLatch(lists.size()); ????????AtomicBoolean?atomicBoolean?=?new?AtomicBoolean(true); ????????for?(int?i?=0;i
?list??=?lists.get(i); ????????????threadArray[i]?=??new?Thread(()?->?{ ????????????????try?{ ?????????????????//最后一個線程拋出異常 ????????????????????if?(!atomicBoolean.get()){ ????????????????????????throw?new?ServiceException("001","出現異常"); ????????????????????} ????????????????????//批量添加,mybatisPlus中自帶的batch方法 ????????????????????this.saveBatch(list); ????????????????}finally?{ ????????????????????countDownLatch.countDown(); ????????????????} ????????????}); ????????} ????????for?(int?i?=?0;?i? 數據庫中存在一條數據:
//測試用例 @RunWith(SpringRunner.class) @SpringBootTest(classes?=?{?ThreadTest01.class,?MainApplication.class}) public?class?ThreadTest01?{ ????@Resource ????private?EmployeeBO?employeeBO; ????/** ?????*???測試多線程事務. ?????*?@throws?InterruptedException ?????*/ ????@Test ????public??void?MoreThreadTest2()?throws?InterruptedException?{ ????????int?size?=?10; ????????List?employeeDOList?=?new?ArrayList<>(size); ????????for?(int?i?=?0;?i 測試結果:
可以發現子線程組執行時,有一個線程執行失敗,其他線程也會拋出異常,但是主線程中執行的刪除操作,沒有回滾,@Transactional注解沒有生效。
使用sqlSession控制手動提交事務
?@Resource ??SqlContext?sqlContext; ?/** ?*?測試多線程事務. ?*?@param?employeeDOList ?*/ @Override public?void?saveThread(List?employeeDOList)?throws?SQLException?{ ????//?獲取數據庫連接,獲取會話(內部自有事務) ????SqlSession?sqlSession?=?sqlContext.getSqlSession(); ????Connection?connection?=?sqlSession.getConnection(); ????try?{ ????????//?設置手動提交 ????????connection.setAutoCommit(false); ????????//獲取mapper ????????EmployeeMapper?employeeMapper?=?sqlSession.getMapper(EmployeeMapper.class); ????????//先做刪除操作 ????????employeeMapper.delete(null); ????????//獲取執行器 ????????ExecutorService?service?=?ExecutorConfig.getThreadPool(); ????????List >?callableList??=?new?ArrayList<>(); ????????//拆分list ????????List >?lists=averageAssign(employeeDOList,?5); ????????AtomicBoolean?atomicBoolean?=?new?AtomicBoolean(true); ????????for?(int?i?=0;i
?list??=?lists.get(i); ????????????//使用返回結果的callable去執行, ????????????Callable ?callable?=?()?->?{ ????????????????//讓最后一個線程拋出異常 ????????????????if?(!atomicBoolean.get()){ ????????????????????throw?new?ServiceException("001","出現異常"); ????????????????} ??????????????return?employeeMapper.saveBatch(list); ????????????}; ????????????callableList.add(callable); ????????} ????????//執行子線程 ???????List >?futures?=?service.invokeAll(callableList); ????????for?(Future ?future:futures)?{ ????????//如果有一個執行不成功,則全部回滾 ????????????if?(future.get()<=0){ ????????????????connection.rollback(); ?????????????????return; ????????????} ????????} ????????connection.commit(); ????????System.out.println("添加完畢"); ????}catch?(Exception?e){ ????????connection.rollback(); ????????log.info("error",e); ????????throw?new?ServiceException("002","出現異常"); ????}finally?{ ?????????connection.close(); ?????} } //?sql ?INSERT?INTO ?employee?(employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status) ?values ????? ?????( ?????#{item.employeeId}, ?????#{item.age}, ?????#{item.employeeName}, ?????#{item.birthDate}, ?????#{item.gender}, ?????#{item.idNumber}, ?????#{item.creatTime}, ?????#{item.updateTime}, ?????#{item.status} ?????????) ????? ?數據庫中一條數據:
測試結果:拋出異常,
刪除操作的數據回滾了,數據庫中的數據依舊存在,說明事務成功了。
成功操作示例:
?@Resource SqlContext?sqlContext; /** ?*?測試多線程事務. ?*?@param?employeeDOList ?*/ @Override public?void?saveThread(List?employeeDOList)?throws?SQLException?{ ????//?獲取數據庫連接,獲取會話(內部自有事務) ????SqlSession?sqlSession?=?sqlContext.getSqlSession(); ????Connection?connection?=?sqlSession.getConnection(); ????try?{ ????????//?設置手動提交 ????????connection.setAutoCommit(false); ????????EmployeeMapper?employeeMapper?=?sqlSession.getMapper(EmployeeMapper.class); ????????//先做刪除操作 ????????employeeMapper.delete(null); ????????ExecutorService?service?=?ExecutorConfig.getThreadPool(); ????????List >?callableList??=?new?ArrayList<>(); ????????List >?lists=averageAssign(employeeDOList,?5); ????????for?(int?i?=0;i
?list??=?lists.get(i); ????????????Callable ?callable?=?()?->?employeeMapper.saveBatch(list); ????????????callableList.add(callable); ????????} ????????//執行子線程 ???????List >?futures?=?service.invokeAll(callableList); ????????for?(Future ?future:futures)?{ ????????????if?(future.get()<=0){ ????????????????connection.rollback(); ?????????????????return; ????????????} ????????} ????????connection.commit(); ????????System.out.println("添加完畢"); ????}catch?(Exception?e){ ????????connection.rollback(); ????????log.info("error",e); ????????throw?new?ServiceException("002","出現異常"); ???????//?throw?new?ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR); ????} } 測試結果:
數據庫中數據:
刪除的刪除了,添加的添加成功了,測試成功。
編輯:黃飛?
評論
查看更多