前言
最近常常需要處理大量的crash數據,對這些數據進行分析,在此之前需要將存量的數據導入自己的數據庫,開始一天一天的去導,發現太慢了,后來嘗試通過python多線程并行導入多天數據,以此記錄對于Python多線程的使用。
進程與線程
在介紹Python的多線程之前,我們需要先明確一下線程和進程的概念,其實線程和進程是操作系統的基本概念,都是實現并發的方式,其二者的區別可以用一句話概括:進程是資源分配的最小單位,而線程是調度的最小單位。 線程是進程的一部分,一個進程含有一個或多個線程。
threading的使用
Python提供了threading庫來實現多線程,其使用多線程的方式有兩種,一種是直接調用如下:
import threading
import time
def say(index):
print("thread%s is running" % index)
time.sleep(1)
print("thread%s is over" % index)
if __name__ == "__main__":
threading.Thread(target=say, args=(1,)).start()
?需要注意的是函數入參的傳入是通過元組實現的,如果只有一個參數,","是不能省略的。
?
除了以上方法,還可以通過繼承threading.Thread來實現,代碼如下。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, index):
threading.Thread.__init__(self) # 必須的步驟
self.index = index
def run(self):
print("thread%s is running" % self.index)
time.sleep(1)
print("thread%s is over" % self.index)
if __name__ == "__main__":
myThread = MyThread(1)
myThread.start()
在threading中提供了很多方法,主要可以分為兩個部分,一部分是關于線程信息的函數,一部分是線程對象的函數。
線程信息的函數
函數 | 說明 |
---|---|
threading.active_count() | 活躍線程Thread的數量 |
threading.current_thread() | 返回當前線程的thread對象 |
threading.enumerate() | 返回當前存活線程的列表 |
threading.main_thread() | 返回當前主線程的Thread對象 |
線程對象Thread的函數和屬性
函數 | 說明 |
---|---|
Thread.name | 線程名,可相同 |
Thread.ident | 線程標識符,非零整數 |
Thread.Daemon | 是否為守護線程 |
Thread.is_alive() | 是否存活 |
Thread.start() | 開啟線程,多次調用會報錯 |
Thread.join(timeout=None) | 等待線程結束 |
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None) | 構造函數 |
Thread.run() | 用來重載 |
線程池
線程可以提高程序的并行性,提高程序執行的效率,雖然python的多線程只是一種假象的多線程,但是在一些io密集的程序中還是可以提高執行效率,其中的細節會在后面詳細解釋。在多線程中線程的調度也會造成一定的開銷,線程數量越多,調度開銷越大,所以我們需要控制線程的數量,使用join可以在主線程等待子線程執行結束,從而控制線程的數量。其代碼如下
import threading
import time
def say(index):
print("thread%s is running" % index)
time.sleep(1)
print("thread%s is over" % index)
if __name__ == "__main__":
for i in range(1, 4, 2):
thread1 = threading.Thread(target=say, args=(i,))
thread2 = threading.Thread(target=say, args=(i + 1,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
結果如下
thread1 is running
thread2 is running
thread1 is over
thread2 is over
thread3 is running
thread4 is running
thread3 is over
thread4 is over
如果不使用join其結果如下:
thread1 is running
thread2 is running
thread3 is running
thread4 is running
thread1 is over
thread2 is over
thread4 is over
thread3 is over
這時候是同時啟動了四個線程
使用join來控制線程數量雖然可以達到目的,但是這樣的實現確實很不優雅,而且線程的創建和銷毀也是很大的開銷,所以針對一些執行頻率高且執行時間短的情況,可以使用線程池,線程池顧名思義就是一個包含固定數量線程的池子,線程池里面的線程是可以重復利用的,執行完任務后不會立刻銷毀而且返回線程池中等待,如果有任務則立即執行下一個任務。
python中的concurrent.futures模塊提供了ThreadPoolExector類來創建線程池,其提供了以下方法:
函數 | 說明 |
---|---|
submit(fn, *args, **kwargs) | 將 fn 函數提交給線程池。args 代表傳給 fn 函數的參數,kwargs 代表以關鍵字參數的形式為 fn 函數傳入參數。 |
map(func, *iterables, timeout=None, chunksize=1) | 該函數將會啟動多個線程,以異步方式立即對 iterables 執行 map 處理。超時拋出TimeoutError錯誤。返回每個函數的結果,注意不是返回future。 |
shutdown(wait=True) | 關閉線程池。關閉之后線程池不再接受新任務,但會將之前提交的任務完成。 |
有一些函數的執行是有返回值的,將任務通過submit提交給線程池后,會返回一個Future對象,Future有以下幾個方法:
函數 | 說明 |
---|---|
cancel() | 取消該 Future 代表的線程任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程序會取消該任務,并返回 True。 |
cancelled() | 如果該 Future 代表的線程任務正在執行、不可被取消,該方法返回 True。 |
running() | 如果該 Future 代表的線程任務正在執行、不可被取消,該方法返回 True。 |
done() | 如果該 Funture 代表的線程任務被成功取消或執行完成,則該方法返回 True。 |
result(timeout=None) | 獲取該 Future 代表的線程任務最后返回的結果。如果 Future 代表的線程任務還未完成,該方法將會阻塞當前線程,其中 timeout 參數指定最多阻塞多少秒。超時拋出TimeoutError,取消拋出CancelledError。 |
exception(timeout=None) | 獲取該 Future 代表的線程任務所引發的異常。如果該任務成功完成,沒有異常,則該方法返回 None。 |
add_done_callback(fn) | 為該 Future 代表的線程任務注冊一個“回調函數”,當該任務成功完成時,程序會自動觸發該 fn 函數,參數是future。 |
之前的問題可以用線程池,代碼如下
import time
from concurrent.futures import ThreadPoolExecutor
def say(index):
print("thread%s is running" % index)
time.sleep(1)
print("thread%s is over" % index)
if __name__ == "__main__":
params = tuple()
for i in range(1, 11):
params = params + (i,)
pool = ThreadPoolExecutor(max_workers=2)
pool.map(say, params)
線程安全與鎖
正如之前所提到的,線程之間是共享資源的,所以當多個線程同時訪問或處理同一資源時會產生一定的問題,會造成資源損壞或者與預期不一致。例如以下程序最后執行結果是157296且每次結果都不一樣。
import threading
import time
lock = threading.Lock()
def task():
global a
for i in range(100000):
a = a + 1
if i == 50:
time.sleep(1)
if __name__ == "__main__":
global a
a = 0
thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(a)
這時候就需要用到鎖,是使用之前將資源鎖定,鎖定期間不允許其他線程訪問,使用完之后再釋放鎖。在python的threading模塊中有Lock和RLock兩個類,它們都有兩個方法,Lock.acquire(blocking=True, timeout=-1) 獲取鎖。Lock.release() 釋放鎖。其二者的區別在于RLock是可重入鎖,一個線程可以多次獲取,主要是為了避免死鎖。一個簡單的例子,以下代碼會死鎖
Lock.acquire()
Lock.acquire()
Lock.release()
Lock.release()
用RLock則不會死鎖
RLock.acquire()
RLock.acquire()
RLock.release()
RLock.release()
?死鎖(Deadlock)是指兩個或兩個以上的線程在執行過程中,由于競爭資源或者由于彼此通信而造成的一種阻塞的現象,若無外力作用,它們都將無法推進下去。此時稱系統處于死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。
?
以上代碼加鎖后就可以得到想要的結果了,其代碼如下
import threading
import time
lock = threading.Lock()
def task():
global a
for i in range(100000):
lock.acquire()
a = a + 1
lock.release()
if i == 50:
time.sleep(1)
if __name__ == "__main__":
global a
a = 0
thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(a)
假的多線程
關于python多線程的簡單使用已經講完了,現在回到之前文中提到的,python的多線程是假的多線程,為什么這么說呢,因為Python中有一個GIL,GIL的全稱是Global Interpreter Lock(全局解釋器鎖),并且由于GIL鎖存在,python里一個進程永遠只能同時執行一個線程(拿到GIL的線程才能執行),這就是為什么在多核CPU上,python的多線程效率并不高。對于計算密集型的Python多線程并不會提高執行效率,甚至可能因為線程切換開銷過大導致性能還不如單線程。但是對于IO密集型的任務,Python多線程還是可以提高效率。
-
數據
+關注
關注
8文章
7002瀏覽量
88943 -
多線程
+關注
關注
0文章
278瀏覽量
19943 -
python
+關注
關注
56文章
4792瀏覽量
84628
發布評論請先 登錄
相關推薦
評論