早期的系統(tǒng)是同步的,容易理解,我們來看個(gè)例子
同步編程
當(dāng)用戶創(chuàng)建一筆電商交易訂單時(shí),要經(jīng)歷的業(yè)務(wù)邏輯流程還是很長(zhǎng)的,每一步都要耗費(fèi)一定的時(shí)間,那么整體的RT就會(huì)比較長(zhǎng)。
于是,聰明的人們開始思考能不能將一些非核心業(yè)務(wù)從主流程中剝離出來,于是有了異步編程
雛形。
異步編程是讓程序并發(fā)運(yùn)行的一種手段。它允許多個(gè)事件同時(shí)發(fā)生,當(dāng)程序調(diào)用需要長(zhǎng)時(shí)間運(yùn)行的方法時(shí),它不會(huì)阻塞當(dāng)前的執(zhí)行流程,程序可以繼續(xù)運(yùn)行。
核心思路:采用多線程優(yōu)化性能,將串行操作變成并行操作。異步模式設(shè)計(jì)的程序可以顯著減少線程等待,從而在高吞吐量場(chǎng)景中,極大提升系統(tǒng)的整體性能,顯著降低時(shí)延。
接下來,我們來講下異步有哪些編程實(shí)現(xiàn)方式
一、線程 Thread
直接繼承 Thread類
是創(chuàng)建異步線程最簡(jiǎn)單的方式。
首先,創(chuàng)建Thread子類,普通類或匿名內(nèi)部類方式;然后創(chuàng)建子類實(shí)例;最后通過start()方法啟動(dòng)線程。
public class AsyncThread extends Thread{
@Override
public void run() {
System.out.println("當(dāng)前線程名稱:" + this.getName() + ", 執(zhí)行線程名稱:" + Thread.currentThread().getName() + "-hello");
}
}
public static void main(String[] args) {
// 模擬業(yè)務(wù)流程
// .......
// 創(chuàng)建異步線程
AsyncThread asyncThread = new AsyncThread();
// 啟動(dòng)異步線程
asyncThread.start();
}
當(dāng)然如果每次都創(chuàng)建一個(gè) Thread線程
,頻繁的創(chuàng)建、銷毀,浪費(fèi)系統(tǒng)資源。我們可以采用線程池
@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
(r, executor) -> log.error("defaultExecutor pool is full! "));
}
將業(yè)務(wù)邏輯封裝到 Runnable
或 Callable
中,交由 線程池
來執(zhí)行
二、Future
上述方式雖然達(dá)到了多線程并行處理,但有些業(yè)務(wù)不僅僅要執(zhí)行過程,還要獲取執(zhí)行結(jié)果。
Java 從1.5版本開始,提供了 Callable
和 Future
,可以在任務(wù)執(zhí)行完畢之后得到任務(wù)執(zhí)行結(jié)果。
當(dāng)然也提供了其他功能,如:取消任務(wù)、查詢?nèi)蝿?wù)是否完成等
Future類位于java.util.concurrent包下,接口定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
方法描述:
- cancel():取消任務(wù),如果取消任務(wù)成功返回true,如果取消任務(wù)失敗則返回false
- isCancelled():表示任務(wù)是否被取消成功,如果在任務(wù)正常完成前被取消成功,則返回 true
- isDone():表示任務(wù)是否已經(jīng)完成,如果完成,返回true
- get():獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)產(chǎn)生阻塞,會(huì)一直等到任務(wù)執(zhí)行完畢才返回
- get(long timeout, TimeUnit unit):用來獲取執(zhí)行結(jié)果,如果在指定時(shí)間內(nèi),還沒獲取到結(jié)果,就直接返回null
代碼示例:
public class CallableAndFuture {
public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(1024), new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "異步處理,Callable 返回結(jié)果";
}
}
public static void main(String[] args) {
Future future = executorService.submit(new MyCallable());
try {
System.out.println(future.get());
} catch (Exception e) {
// nodo
} finally {
executorService.shutdown();
}
}
}
Future 表示一個(gè)可能還沒有完成的異步任務(wù)的結(jié)果,通過 get
方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。
三、FutureTask
FutureTask
實(shí)現(xiàn)了 RunnableFuture
接口,則 RunnableFuture
接口繼承了 Runnable
接口和 Future
接口,所以可以將 FutureTask
對(duì)象作為任務(wù)提交給 ThreadPoolExecutor
去執(zhí)行,也可以直接被 Thread
執(zhí)行;又因?yàn)閷?shí)現(xiàn)了 Future
接口,所以也能用來獲得任務(wù)的執(zhí)行結(jié)果。
FutureTask 構(gòu)造函數(shù):
public FutureTask(Callable callable)
public FutureTask(Runnable runnable, V result)
FutureTask 常用來封裝 Callable
和 Runnable
,可以作為一個(gè)任務(wù)提交到線程池中執(zhí)行。除了作為一個(gè)獨(dú)立的類之外,也提供了一些功能性函數(shù)供我們創(chuàng)建自定義 task 類使用。
FutureTask 線程安全由CAS來保證。
ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包裝callbale任務(wù),再交給線程池執(zhí)行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("子線程開始計(jì)算:");
Integer sum = 0;
for (int i = 1; i <= 100; i++)
sum += i;
return sum;
});
// 線程池執(zhí)行任務(wù), 運(yùn)行結(jié)果在 futureTask 對(duì)象里面
executor.submit(futureTask);
try {
System.out.println("task運(yùn)行結(jié)果計(jì)算的總和為:" + futureTask.get());
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
Callable 和 Future 的區(qū)別:Callable 用于產(chǎn)生結(jié)果,F(xiàn)uture 用于獲取結(jié)果
如果是對(duì)多個(gè)任務(wù)多次自由串行、或并行組合,涉及多個(gè)線程之間同步阻塞獲取結(jié)果,F(xiàn)uture 代碼實(shí)現(xiàn)會(huì)比較繁瑣,需要我們手動(dòng)處理各個(gè)交叉點(diǎn),很容易出錯(cuò)。
四、異步框架 CompletableFuture
Future 類通過 get()
方法阻塞等待獲取異步執(zhí)行的運(yùn)行結(jié)果,性能比較差。
JDK1.8 中,Java 提供了 CompletableFuture
類,它是基于異步函數(shù)式編程。相對(duì)阻塞式等待返回結(jié)果,CompletableFuture
可以通過回調(diào)的方式來處理計(jì)算結(jié)果,實(shí)現(xiàn)了異步非阻塞,性能更優(yōu)。
優(yōu)點(diǎn) :
- 異步任務(wù)結(jié)束時(shí),會(huì)自動(dòng)回調(diào)某個(gè)對(duì)象的方法
- 異步任務(wù)出錯(cuò)時(shí),會(huì)自動(dòng)回調(diào)某個(gè)對(duì)象的方法
- 主線程設(shè)置好回調(diào)后,不再關(guān)心異步任務(wù)的執(zhí)行
泡茶示例:
(內(nèi)容摘自:極客時(shí)間的《Java 并發(fā)編程實(shí)戰(zhàn)》)
//任務(wù)1:洗水壺->燒開水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(() -> {
System.out.println("T1:洗水壺...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:燒開水...");
sleep(15, TimeUnit.SECONDS);
});
//任務(wù)2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture f2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壺...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶葉...");
sleep(1, TimeUnit.SECONDS);
return "龍井";
});
//任務(wù)3:任務(wù)1和任務(wù)2完成后執(zhí)行:泡茶
CompletableFuture f3 =
f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1:拿到茶葉:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任務(wù)3執(zhí)行結(jié)果
System.out.println(f3.join());
}
CompletableFuture 提供了非常豐富的API,大約有50種處理串行,并行,組合以及處理錯(cuò)誤的方法。
-
編程
+關(guān)注
關(guān)注
88文章
3616瀏覽量
93759 -
程序
+關(guān)注
關(guān)注
117文章
3787瀏覽量
81069 -
異步
+關(guān)注
關(guān)注
0文章
62瀏覽量
18060
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論