concurrent.futures:线程池,让你更加高效、并发的处理任务
《asyncio 系列》12. 详解 asyncio 支持的多种队列 - 古明地盆 - 博客园
首先需要明确的是,多进程和其他语言的一样,能够利用多核cpu,但是python由于GIL的存在,多线程在执行的时候,实际上,每一时刻只有一个线程在执行。相当于是单线程。然而多线程在某些情况下,还是能够起到加速的效果。
需要了解的是,程序的耗时一般消耗在IO和CPU上,按照占比不同,一般分为IO密集型或者CPU密集型。比如文件读写、网络传输,磁盘IO等,属于IO密集型,而矩阵计算、数值计算这种就属于CPU密集型。在单线程中,遇见IO操作的时候,CPU会阻塞,直到IO操作完成,花费的时间成本为IO耗时加CPU耗时。但是在多线程中,遇见IO操作的时候,该线程会交出GIL,其他线程可以继续运行,这样可以让CPU和IO并行。因此,如果是IO密集型,即在代码中,主要是进行IO读取,那么多线程仍然能够起到加速左右,值得注意的是,这里的加速效果应该是来自于处理IO的设备,支持并行IO,即同一时刻,能够处理多个IO请求。反之,如果是CPU密集型,IO耗时忽略不计的话,此时多线程相当于是单线程,同时考虑到线程的上下文切换,那么多线程的运行时间反而会更多。
线程池的使用方法submit和map
python中concurrent.futures这个类提供了线程池和进程池的接口。as_completed按照任务的完成时间返回,map按照任务的添加时间返回
我们可以通过submit或map添加任务,但使用起来存在细微差别。
一般通过submit得到一个包含future对象的列表,然后通过concurrent.futures.as_completed去遍历这个列表,该方法会阻塞,可以设置超时时间。每当有任务完成的时候,就能通过future.result()得到任务执行的结果,该方法同样会阻塞,可以设置超时时间。因此通过这种方法,输出是按照任务执行完成的时间排序的。
当然,我们也可以不用as_completed去遍历,这样就按照任务的顺序返回。因为每个任务如果没完成就阻塞,完成了就添加。
1 | import concurrent.futures |
上面两种submit,依次输出1 3 7 8和7 1 3 8
map函数则不太一样,第一个参数是需要线程执行的函数,第二个参数是一个迭代器,会依此将参数应用到线程函数中。返回结果和列表的顺序一样。返回7 1 3 8
多线程和多进程的对比
线程安全
1 | import concurrent.futures |
注释掉上面代码的lock,试试加锁和不加锁,可以很清晰的看到,加锁的时候不会有竞争冒险,而不加锁则可能有竞争冒险。因为几率是比较小的,观察不到的话,可以加大循环的次数。
thread local的用法
现在有这样一个场景,我们知道全局变量是所有线程共享的,局部变量只在每个任务执行的时候存在。thread local就是给每个线程一个独立的变量,伴随线程整个生命周期,且线程间隔离。看用法就清楚了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45from collections import defaultdict
import concurrent.futures
import time
import threading
# 利用local类,创建一个全局对象 local_obj
local_obj = threading.local()
def task(times):
if not hasattr(local_obj, 'var'): # 判断是否已经存在该属性
local_obj.var = 0
if not hasattr(local_obj, 'count'): # 判断是否已经存在该属性
local_obj.count = 0
for i in range(10):
local_obj.var += 1
local_obj.count += 1
print(f'线程id:{threading.get_ident()},thread-local数据:{local_obj.var},执行了{local_obj.count}次')
# 返回线程的 thread-local 数据
return {
'thread_id': threading.get_ident(),
'var': local_obj.var,
'count': local_obj.count
}
def main():
num_threads = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(task, t) for t in range(10)]
# 收集每个任务的结果
results = defaultdict(int)
for future in concurrent.futures.as_completed(futures):
result = future.result()
thread_id = result['thread_id']
thread_var = result['var']
if(results[thread_id]<thread_var):
results[thread_id] = thread_var
else:
continue
print(f"len(results)={len(results)},results = {results}")
if __name__ == "__main__":
main()
打印结果为1
2
3
4
5
6
7
8
9
10
11线程id:26396,thread-local数据:10,执行了1次
线程id:26396,thread-local数据:20,执行了2次
线程id:28596,thread-local数据:10,执行了1次
线程id:26396,thread-local数据:30,执行了3次
线程id:28596,thread-local数据:20,执行了2次
线程id:26396,thread-local数据:40,执行了4次
线程id:28596,thread-local数据:30,执行了3次
线程id:26396,thread-local数据:50,执行了5次
线程id:28596,thread-local数据:40,执行了4次
线程id:14512,thread-local数据:10,执行了1次
len(results)=3,results = defaultdict(<class 'int'>, {26396: 50, 28596: 40, 14512: 10})
使用thread local的好处是,避免多个线程操作共享变量的时候,频繁的上锁。
当然,其实在上面这种场景下,每个线程执行完了立马返回,然后在主线程收集结果,其实也是可行的,也不需要用锁。仔细体会一下~
多进程多线程
1 | import multiprocessing |
一个例子
1 | import multiprocessing |
补充一个关于进程复制的例子
我们知道子进程会拷贝父进程内存空间,因此所有的父进程中的所有变量,子进程都会拷贝一份,彼此隔离。所以我们如果想用锁实现每个子进程内部线程的同步,那么我们只需要在主进程中创建一个线程锁就行了!然后每个子进程会复制一份,然后再在每个子进程的线程中,就像单进程多线程使用全局线程锁一样使用就行了!
当然,除了上面的方式,我们也可以不在主进程中创建线程锁,而是在每个进程函数中,创建每个进程唯一的线程锁,但是这样应该是需要把锁作为参数传递给子线程的?
之前疑惑的点,是怎么把锁传给每个进程,是因为我误解了fork函数,搞清楚fork会把父进程全部拷贝一份,就都懂了!
1 | import threading |