1、FutureTask 對象介紹
Future 對象大家都不陌生,是 JDK1.5 提供的接口,是用來以阻塞的方式獲取線程異步執(zhí)行完的結(jié)果。 在 Java 中想要通過線程執(zhí)行一個任務(wù),離不開 Runnable 與 Callable 這兩個接口。 Runnable 與 Callable 的區(qū)別在于,Runnable 接口只有一個 run 方法,該方法用來執(zhí)行邏輯,但是并沒有返回值;而 Callable 的 call 方法,同樣用來執(zhí)行業(yè)務(wù)邏輯,但是是有一個返回值的。
Callable 執(zhí)行任務(wù)過程中可以通過 FutureTask 獲得任務(wù)的執(zhí)行狀態(tài),并且可以在執(zhí)行完成后通過 Future.get () 方式獲取執(zhí)行結(jié)果。 Future 是一個接口,而 FutureTask 就是 Future 的實(shí)現(xiàn)類。并且 FutureTask 實(shí)現(xiàn)了 RunnableFuture(Runnable + Future),說明我們可以創(chuàng)建一個 FutureTask 并直接把它放到線程池執(zhí)行,然后獲取 FutureTask 的執(zhí)行結(jié)果。
2、FutureTask 源碼解析
2.1 主要方法和屬性
那么 FutureTask 是如何通過阻塞的方式來獲取到異步線程執(zhí)行的結(jié)果的呢?我們看下 FutureTask 中的屬性。
// FutureTask的狀態(tài)及其常量 privatevolatileint state; privatestaticfinalint NEW =0; privatestaticfinalint COMPLETING =1; privatestaticfinalint NORMAL =2; privatestaticfinalint EXCEPTIONAL =3; privatestaticfinalint CANCELLED =4; privatestaticfinalint INTERRUPTING =5; privatestaticfinalint INTERRUPTED =6; // callable對象,執(zhí)行完后置空 privateCallablecallable; // 要返回的結(jié)果或要引發(fā)的異常來自 get() 方法 privateObject outcome;// non-volatile, protected by state reads/writes // 執(zhí)行Callable的線程 privatevolatileThread runner; // 等待線程的一個鏈表結(jié)構(gòu) privatevolatileWaitNode waiters;
?FutureTask 中幾個比較重要的方法。
// 取消任務(wù)的執(zhí)行 booleancancel(boolean mayInterruptIfRunning); // 返回任務(wù)是否已經(jīng)被取消 booleanisCancelled(); // 返回任務(wù)是否已經(jīng)完成,任務(wù)狀態(tài)不為NEW即為完成 booleanisDone(); // 通過get方法獲取任務(wù)的執(zhí)行結(jié)果 Vget()throwsInterruptedException,ExecutionException; // 通過get方法獲取任務(wù)的執(zhí)行結(jié)果,帶有超時,如果超過給定時間則拋出異常 Vget(long timeout,TimeUnit unit) throwsInterruptedException,ExecutionException,TimeoutException;
?2.2 FutureTask 執(zhí)行
當(dāng)我們在線程池中執(zhí)行一個 Callable 方法時,其實(shí)是將 Callable 任務(wù)封裝成一個 RunnableFuture 對象去執(zhí)行,同時將這個 RunnableFuture 對象返回,這樣我們就拿到了 FutureTask 的引用,可以隨時獲取到任務(wù)執(zhí)行的狀態(tài),并且可以在任務(wù)執(zhí)行完成后通過該對象獲取執(zhí)行結(jié)果。 以下為 ThreadPoolExecutor 線程池提交一個 callable 方法的源碼。
publicFuture submit(Callable task){ if(task ==null)thrownewNullPointerException(); RunnableFuture ftask =newTaskFor(task); execute(ftask); return ftask; } protected RunnableFuture newTaskFor(Callable callable){ returnnewFutureTask (callable); }
?2.3 run 方法介紹
RunnableFuture 其實(shí)也是一個可以執(zhí)行的 runnable,我們看下他的 run 方法。其主要流程就是執(zhí)行 call 方法,正常執(zhí)行完畢后將 result 結(jié)果賦值到 outcome 屬性上。
publicvoidrun(){ if(state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null,Thread.currentThread())) return; try{ // 將callable賦值到本地變量 Callablec = callable; // 判斷callable不為空并且FutureTask的狀態(tài)必須為新創(chuàng)建 if(c !=null&& state == NEW){ V result; boolean ran; try{ // 執(zhí)行call方法(用戶自己實(shí)現(xiàn)的call邏輯),并獲取到result結(jié)果 result = c.call(); ran =true; }catch(Throwable ex){ result =null; ran =false; // 如果執(zhí)行過程出現(xiàn)異常,則將異常對象賦值到outcome上 setException(ex); } // 如果正常執(zhí)行完畢,則將result賦值到outcome屬性上 if(ran) set(result); } }finally{ // runner must be non-null until state is settled to // prevent concurrent calls to run() runner =null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
?以下邏輯為正常執(zhí)行完成后賦值的邏輯。
// 如果任務(wù)沒有被取消,將future執(zhí)行完的返回值賦值給result結(jié)果 // FutureTask任務(wù)的執(zhí)行狀態(tài)是通過CAS的方式進(jìn)行賦值的,并且由此可知,COMPLETING其實(shí)是一個瞬時狀態(tài) // 當(dāng)將線程執(zhí)行結(jié)果賦值給outcome后,狀態(tài)會修改為對應(yīng)的NORMAL,即正常結(jié)束 protectedvoidset(V v){ if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){ outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state finishCompletion(); } }
?以下為執(zhí)行異常時賦值邏輯,直接將 Throwable 對象賦值到 outcome 屬性上。
protectedvoidsetException(Throwable t){ if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){ outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state finishCompletion(); } }?無論是正常執(zhí)行還是異常執(zhí)行,最終都會調(diào)用一個 finishCompletion 方法,用來做工作的收尾工作。
2.4 get 方法介紹
Future 的 get 方法有兩個重載的方法,一個是 get () 獲取結(jié)果,一個是 get (long, TimeUnit) 帶有超時時間的獲取結(jié)果,我們看下 FutureTask 中的這兩個方法是如何實(shí)現(xiàn)的。
// 不帶有超時時間,一直阻塞直到獲取結(jié)果 publicVget()throwsInterruptedException,ExecutionException{ int s = state; if(s <= COMPLETING) // 等待結(jié)果完成,帶有超時的get方法也是調(diào)用的awaitDone方法 s =awaitDone(false,0L); // 返回結(jié)果 returnreport(s); } // 帶有超時時間的獲取結(jié)果,如果超過時間還沒有獲取到結(jié)果則拋出異常 publicVget(long timeout,TimeUnit unit) throwsInterruptedException,ExecutionException,TimeoutException{ if(unit ==null) thrownewNullPointerException(); int s = state; // 如果任務(wù)未中斷,調(diào)用awaitDone方法等待任務(wù)結(jié)果 if(s <= COMPLETING && (s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING) thrownewTimeoutException(); // 返回結(jié)果 returnreport(s); }
?我們主要看下 awaitDone 方法的執(zhí)行邏輯。此方法會通過 for 循環(huán)的方式一直阻塞等待任務(wù)執(zhí)行完成。如果帶有超時時間,則超過截止時間后會直接返回。
// timed:是否需要超時獲取 // nanos:超時時間單位納秒 privateintawaitDone(boolean timed,long nanos) throwsInterruptedException{ finallong deadline = timed ?System.nanoTime()+ nanos :0L; WaitNode q =null; boolean queued =false; // 此方法會一直for循環(huán)判斷任務(wù)狀態(tài)是否已經(jīng)完成,是Future.get阻塞的原因 for(;;){ if(Thread.interrupted()){ removeWaiter(q); thrownewInterruptedException(); } int s = state; // 任務(wù)狀態(tài)大于COMPLETING,則表明任務(wù)結(jié)束,直接返回 if(s > COMPLETING){ if(q !=null) q.thread =null; return s; } elseif(s == COMPLETING)// cannot time out yet // Thread.yield() 方法,使當(dāng)前線程由執(zhí)行狀態(tài),變成為就緒狀態(tài),讓出cpu時間,在下一個線程執(zhí)行時候,此線程有可能被執(zhí)行,也有可能沒有被執(zhí)行。 // COMPLETING狀態(tài)為瞬時狀態(tài),任務(wù)執(zhí)行完成,要么是正常結(jié)束,要么異常結(jié)束,后續(xù)會被置為NORMAL或者EXCEPTIONAL Thread.yield(); elseif(q ==null) // 每調(diào)用一次get方法,都會創(chuàng)建一個WaitNode等待節(jié)點(diǎn) q =newWaitNode(); elseif(!queued) // 將該等待節(jié)點(diǎn)添加到鏈表結(jié)構(gòu)waiters中,q.next = waiters 即在waiters的頭部插入 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果方法帶有超時判斷,則判斷當(dāng)前時間是否已經(jīng)超過了截止時間,如果超過了及截止日期,則退出循環(huán)直接返回當(dāng)前狀態(tài),此時任務(wù)狀態(tài)一定是NEW elseif(timed){ nanos = deadline -System.nanoTime(); if(nanos <=0L){ removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }?我們在看下 report 方法,在調(diào)用 get 方法時是如何返回結(jié)果的。
這里首先獲取 outcome 的值,并判斷任務(wù)是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成,則將 outcome 對象強(qiáng)轉(zhuǎn)成泛型指定的類型;如果任務(wù)被取消了,則拋出一個 CancellationException 異常;如果都不是,則說明任務(wù)在執(zhí)行過程中發(fā)生了異常,此時任務(wù)狀態(tài)位 EXCEPTIONAL,此時的 outcome 即為 Throwable 對象,所以將 outcome 強(qiáng)轉(zhuǎn)為 Throwable 并拋出異常。
由此可以知道,我們將一個 FutureTask 任務(wù) submit 到線程池中執(zhí)行的時候,如果發(fā)生了異常,是會在調(diào)用 get 方法的時候拋出的。
privateVreport(int s)throwsExecutionException{ Object x = outcome; if(s == NORMAL) return(V)x; if(s >= CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); }
?2.5 cancel 方法介紹
cancel 方法用于取消正在運(yùn)行的任務(wù),如果任務(wù)取消成功,則返回 TRUE,如果取消失敗則返回 FALSE。
// mayInterruptIfRunning:允許中斷正在運(yùn)行的任務(wù) publicbooleancancel(boolean mayInterruptIfRunning){ // mayInterruptIfRunning如果為true則將狀態(tài)置為INTERRUPTING,如果未false則將狀態(tài)置為CANCELLED if(!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) returnfalse; // 如果狀態(tài)修改成功后,判斷是否允許中斷線程,如果允許,則調(diào)用Thread的interrupt方法中斷 try{// in case call to interrupt throws exception if(mayInterruptIfRunning){ try{ Thread t = runner; if(t !=null) t.interrupt(); }finally{// final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } }finally{ // 取消后的收尾工作 finishCompletion(); } returntrue; }
?2.6 isDone/isCancelled 方法介紹
isDone 方法用于判斷 FutureTask 是否已經(jīng)完成;isCancelled 方法用來判斷 FutureTask 是否已經(jīng)取消,這兩個方法都是通過狀態(tài)位來判斷的。
publicbooleanisCancelled(){ return state >= CANCELLED; } publicbooleanisDone(){ return state != NEW; }
?2.7 finishCompletion 方法介紹
我們看下 finishCompletion 方法都做了哪些工作。
// 刪除所有等待線程并發(fā)出信號,最后執(zhí)行done方法 privatevoidfinishCompletion(){ // assert state > COMPLETING; for(WaitNode q;(q = waiters)!=null;){ if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){ for(;;){ Thread t = q.thread; if(t !=null){ q.thread =null; LockSupport.unpark(t); } WaitNode next = q.next; if(next ==null) break; q.next =null;// unlink to help gc q = next; } break; } } done(); callable =null;// to reduce footprint }?我們看到 done 方法是一個受保護(hù)的空方法,此處沒有任何邏輯,由其子類去根據(jù)自己的業(yè)務(wù)去實(shí)現(xiàn)相應(yīng)的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}
3、總結(jié)
通過源碼解讀可以了解到 Future 的原理:
第一步:主線程將任務(wù)封裝成一個 Callable 對象,通過 submit 方法提交到線程池去執(zhí)行。
第二步:線程池執(zhí)行任務(wù)的 run 方法,主線程則可以繼續(xù)執(zhí)行其他邏輯。
第三步:線程池中方法執(zhí)行完成后將結(jié)果賦值到 outcome 屬性上,并修改任務(wù)狀態(tài)。
第四步:主線程在需要拿到異步任務(wù)結(jié)果的時候,主動調(diào)用 fugure.get () 方法來獲取結(jié)果。
第五步:如果異步線程在執(zhí)行過程中發(fā)生異常,則會在調(diào)用 future.get () 方法的時候拋出來。 以上就是對于 FutureTask 的分析,我們可以了解 FutureTask 任務(wù)執(zhí)行的方式以及 Future.get 已阻塞的方式獲取線程執(zhí)行的結(jié)果原理,并且從代碼中可以了解 FutureTask 的任務(wù)執(zhí)行狀態(tài)以及狀態(tài)的變化過程。
審核編輯:劉清
-
狀態(tài)機(jī)
+關(guān)注
關(guān)注
2文章
492瀏覽量
27529 -
線程池
+關(guān)注
關(guān)注
0文章
57瀏覽量
6844 -
for循環(huán)
+關(guān)注
關(guān)注
0文章
61瀏覽量
2502
原文標(biāo)題:并發(fā)編程 - FutureTask 解析
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論