作為一個分布式異步計算框架,Celery雖然常用于Web框架中,但也可以單獨使用。 雖然常規搭配的消息隊列是RabbitMQ,但是由于某些情況下系統已經包含了Redis,那就可以復用。
以下撇開Web框架,介紹基于Redis配置Celery任務的方法。
pip install celery[redis]
項目結構
其中,main.py是觸發Task的業務代碼。當然,文件名可以隨意改。celery.py是Celery的app定義的位置,tasks.py是Task定義的位置,文件名不建議修改。
配置Celery
在celery.py中寫入如下代碼:
其中,REDIS_URL從同一的配置settings.py中引入, 形式大概是redis://localhost:6379/0。這里既用Redis來當broker,又用來當backend。即,既當消息隊列,又當結果反饋的數據庫(默認僅保存1天)。
在include=,需要填一個下游worker的包名列表。這里選擇了同一個包的tasks.py文件。
額外設置的task_track_started,是命令Worker反饋STARTED狀態。默認情況下,是無法知道任務什么時候開始執行的。
編寫任務并調用
在tasks.py文件中,添加異步任務的實現。
在需要發起任務的地方,用.apply_async可以觸發異步調用。即,實際只是向消息隊列發送消息,真正的執行操作在遠程。
運行Worker:
celery -A your_project worker
運行原理
一次Task從觸發到完成,序列圖如下:
其中,main代表業務代碼主進程。它可能是Django、Flask這類Web服務,也可能是一個其它類型的進程。worker就是指Celery的Worker。
main發送消息后,會得到一個AsyncResult,其中包含task_id。僅通過task_id,也可以自己構造一個AsyncResult,查詢相關信息。其中,代表運行過程的,主要是state。
worker會持續保持對Redis(或其它消息隊列,如RabbitMQ)的關注,查詢新的消息。如果獲得新消息,將其消費后,開始運行do_sth。運行完成會把返回值對應的結果,以及一些運行信息,回寫到Redis(或其它backend,如Django數據庫等)上。在系統的任何地方,通過對應的AsyncResult(task_id)就可以查詢到結果。
Celery Task的狀態
以下是狀態圖:
其中,除SUCCESS外,還有失敗(FAILURE)、取消(REVOKED)兩個結束狀態。而RETRY則是在設置了重試機制后,進入的臨時等待狀態。
另外,如果保存在Redis的結果信息被清理(默認僅保存1天),那么任務狀態又會變成PENDING。這在設計上是個巨大的問題,使用時要做對應容錯。
常見控制操作
有時,在業務主進程中需要等待異步運行的結果,這時需要使用wait。如果要取消一個排隊中、或已執行的任務,則可以使用revoke。即使任務已經執行完成,也可以使用revoke,但不會有任何變化。如果需要提前刪除任務記錄,可以使用forget。
責編AJX
-
Web
+關注
關注
2文章
1263瀏覽量
69511 -
分布式
+關注
關注
1文章
903瀏覽量
74537 -
Redis
+關注
關注
0文章
376瀏覽量
10882
發布評論請先 登錄
相關推薦
評論