1. 功能說明
2. 多線程任務示例
2.1 線程池
2.2 單個任務
2.3 任務入口
2.4 結果分析
2.5 源碼地址
3. 寫在最后
大家好,今天教大家擼一個 Java 的多線程永動任務,這個示例的原型是公司自研的多線程異步任務項目 ,我把里面涉及到多線程的代碼抽離出來,然后進行一定的改造。
里面涉及的知識點非常多,特別適合有一定工作經驗 的同學學習,或者可以直接拿到項目中使用。
文章結構非常簡單:
1. 功能說明
做這個多線程異步任務,主要是因為我們有很多永動的異步任務,什么是永動呢?就是任務跑起來后,需要一直跑下去。
比如消息 Push 任務,因為一直有消息過來,所以需要一直去消費 DB 中的未推送消息,就需要整一個 Push 的永動異步任務。
我們的需求其實不難,簡單總結一下:
能同時執行多個永動的異步任務 ;
每個異步任務,支持開多個線程 去消費這個任務的數據;
支持永動異步任務的優雅關閉 ,即關閉后,需要把所有的數據消費完畢后,再關閉。
完成上面的需求,需要注意幾個點:
每個永動任務 ,可以開一個線程去執行;
每個子任務 ,因為需要支持并發,需要用線程池控制;
永動任務的關閉,需要通知子任務的并發線程,并支持永動任務和并發子任務的優雅關閉 。
2. 多線程任務示例
2.1 線程池
對于子任務,需要支持并發,如果每個并發都開一個線程,用完就關閉,對資源消耗太大,所以引入線程池:
publicclassTaskProcessUtil{ //每個任務,都有自己單獨的線程池 privatestaticMapexecutors=newConcurrentHashMap<>(); //初始化一個線程池 privatestaticExecutorServiceinit(StringpoolName,intpoolSize){ returnnewThreadPoolExecutor(poolSize,poolSize, 0L,TimeUnit.MILLISECONDS, newLinkedBlockingQueue (), newThreadFactoryBuilder().setNameFormat("Pool-"+poolName).setDaemon(false).build(), newThreadPoolExecutor.CallerRunsPolicy()); } //獲取線程池 publicstaticExecutorServicegetOrInitExecutors(StringpoolName,intpoolSize){ ExecutorServiceexecutorService=executors.get(poolName); if(null==executorService){ synchronized(TaskProcessUtil.class){ executorService=executors.get(poolName); if(null==executorService){ executorService=init(poolName,poolSize); executors.put(poolName,executorService); } } } returnexecutorService; } //回收線程資源 publicstaticvoidreleaseExecutors(StringpoolName){ ExecutorServiceexecutorService=executors.remove(poolName); if(executorService!=null){ executorService.shutdown(); } } }
這是一個線程池的工具類,這里初始化線程池和回收線程資源很簡單,我們主要討論獲取線程池。
獲取線程池可能會存在并發情況,所以需要加一個 synchronized 鎖,然后鎖住后,需要對 executorService 進行二次判空校驗。
2.2 單個任務
為了更好講解單個任務的實現方式,我們的任務主要就是把 Cat 的數據打印出來,Cat 定義如下:
@Data @Service publicclassCat{ privateStringcatName; publicCatsetCatName(Stringname){ this.catName=name; returnthis; } }
單個任務主要包括以下功能:
獲取永動任務數據 :這里一般都是掃描 DB,我直接就簡單用 queryData() 代替。
多線程執行任務 :需要把數據拆分成 4 份,然后分別由多線程并發執行,這里可以通過線程池支持;
永動任務優雅停機 :當外面通知任務需要停機,需要執行完剩余任務數據,并回收線程資源,退出任務;
永動執行 :如果未收到停機命令,任務需要一直執行下去。
直接看代碼:
publicclassChildTask{ privatefinalintPOOL_SIZE=3;//線程池大小 privatefinalintSPLIT_SIZE=4;//數據拆分大小 privateStringtaskName; //接收jvm關閉信號,實現優雅停機 protectedvolatilebooleanterminal=false; publicChildTask(StringtaskName){ this.taskName=taskName; } //程序執行入口 publicvoiddoExecute(){ inti=0; while(true){ System.out.println(taskName+":Cycle-"+i+"-Begin"); //獲取數據 Listdatas=queryData(); //處理數據 taskExecute(datas); System.out.println(taskName+":Cycle-"+i+"-End"); if(terminal){ //只有應用關閉,才會走到這里,用于實現優雅的下線 break; } i++; } //回收線程池資源 TaskProcessUtil.releaseExecutors(taskName); } //優雅停機 publicvoidterminal(){ //關機 terminal=true; System.out.println(taskName+"shutdown"); } //處理數據 privatevoiddoProcessData(List datas,CountDownLatchlatch){ try{ for(Catcat:datas){ System.out.println(taskName+":"+cat.toString()+",ThreadName:"+Thread.currentThread().getName()); Thread.sleep(1000L); } }catch(Exceptione){ System.out.println(e.getStackTrace()); }finally{ if(latch!=null){ latch.countDown(); } } } //處理單個任務數據 privatevoidtaskExecute(List sourceDatas){ if(CollectionUtils.isEmpty(sourceDatas)){ return; } //將數據拆成4份 List >splitDatas=Lists.partition(sourceDatas,SPLIT_SIZE); finalCountDownLatchlatch=newCountDownLatch(splitDatas.size()); //并發處理拆分的數據,共用一個線程池 for(finalList
datas:splitDatas){ ExecutorServiceexecutorService=TaskProcessUtil.getOrInitExecutors(taskName,POOL_SIZE); executorService.submit(newRunnable(){ @Override publicvoidrun(){ doProcessData(datas,latch); } }); } try{ latch.await(); }catch(Exceptione){ System.out.println(e.getStackTrace()); } } //獲取永動任務數據 privateList queryData(){ List datas=newArrayList<>(); for(inti=0;i
簡單解釋一下:
queryData :用于獲取數據,實際應用中其實是需要把 queryData 定為抽象方法,然后由各個任務實現自己的方法。
doProcessData :數據處理邏輯,實際應用中其實是需要把 doProcessData 定為抽象方法,然后由各個任務實現自己的方法。
taskExecute :將數據拆分成 4 份,獲取該任務的線程池,并交給線程池并發執行,然后通過 latch.await() 阻塞。當這 4 份數據都執行成功后,阻塞結束,該方法才返回。
terminal :僅用于接受停機命令,這里該變量定義為 volatile,所以多線程內存可見;
doExecute :程序執行入口,封裝了每個任務執行的流程,當 terminal=true 時,先執行完任務數據,然后回收線程池,最后退出。
2.3 任務入口
直接上代碼:
publicclassLoopTask{ privateListchildTasks; publicvoidinitLoopTask(){ childTasks=newArrayList(); childTasks.add(newChildTask("childTask1")); childTasks.add(newChildTask("childTask2")); for(finalChildTaskchildTask:childTasks){ newThread(newRunnable(){ @Override publicvoidrun(){ childTask.doExecute(); } }).start(); } } publicvoidshutdownLoopTask(){ if(!CollectionUtils.isEmpty(childTasks)){ for(ChildTaskchildTask:childTasks){ childTask.terminal(); } } } publicstaticvoidmain(Stringargs[])throwsException{ LoopTaskloopTask=newLoopTask(); loopTask.initLoopTask(); Thread.sleep(5000L); loopTask.shutdownLoopTask(); } }
每個任務都開一個單獨的 Thread,這里我初始化了 2 個永動任務,分別為 childTask1 和 childTask2,然后分別執行,后面 Sleep 了 5 秒后,再關閉任務,我們可以看看是否可以按照我們的預期優雅退出。
2.4 結果分析
執行結果如下:
childTask1:Cycle-0-Begin childTask2:Cycle-0-Begin childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1 childTask2:Cycle-0-End childTask2:Cycle-1-Begin childTask1:Cycle-0-End childTask1:Cycle-1-Begin childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1 childTask1shutdown childTask2shutdown childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2 childTask1:Cycle-1-End childTask2:Cycle-1-End
輸出數據:
“Pool-childTask” 是線程池名稱;
“childTask” 是任務名稱;
“Cat(catName=羅小黑)” 是執行的結果;
“childTask shut down” 是關閉標記;
“childTask:Cycle-X-Begin” 和“childTask:Cycle-X-End” 是每一輪循環的開始和結束標記。
我們分析一下執行結果:
childTask1 和 childTask2 分別執行,在第一輪循環中都正常輸出了 5 條羅小黑數據;
第二輪執行過程中,我啟動了關閉指令,這次第二輪執行沒有直接停止,而是先執行完任務中的數據,再執行退出,所以完全符合我們的優雅退出結論。
2.5 源碼地址
GitHub 地址:
https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc
3. 寫在最后
對于這個經典的線程池使用示例,原項目是我好友一灰 寫的,技術水平阿里 P7級別,實現得也非常優雅,涉及的知識點非常多 ,非常值得大家學習。
-
JAVA
+關注
關注
20文章
2988瀏覽量
108846 -
編程
+關注
關注
88文章
3687瀏覽量
95112 -
多線程
+關注
關注
0文章
279瀏覽量
20399 -
代碼
+關注
關注
30文章
4895瀏覽量
70548 -
Thread
+關注
關注
2文章
88瀏覽量
26525
原文標題:新來個阿里 P7,僅花 2 小時,擼出一個多線程永動任務,看完直接跪了,真牛逼!
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
什么時候要使用多線程
java多線程編程實例 (源程序)
java多線程設計模式_結城浩

Java多線程總結之Queue

評論