浅谈Python多线程和多进程

concurrent.futures:线程池,让你更加高效、并发的处理任务

《asyncio 系列》12. 详解 asyncio 支持的多种队列 - 古明地盆 - 博客园

首先需要明确的是,多进程和其他语言的一样,能够利用多核cpu,但是python由于GIL的存在,多线程在执行的时候,实际上,每一时刻只有一个线程在执行。相当于是单线程。然而多线程在某些情况下,还是能够起到加速的效果。

需要了解的是,程序的耗时一般消耗在IO和CPU上,按照占比不同,一般分为IO密集型或者CPU密集型。比如文件读写、网络传输,磁盘IO等,属于IO密集型,而矩阵计算、数值计算这种就属于CPU密集型。在单线程中,遇见IO操作的时候,CPU会阻塞,直到IO操作完成,花费的时间成本为IO耗时加CPU耗时。但是在多线程中,遇见IO操作的时候,该线程会交出GIL,其他线程可以继续运行,这样可以让CPU和IO并行。因此,如果是IO密集型,即在代码中,主要是进行IO读取,那么多线程仍然能够起到加速左右,值得注意的是,这里的加速效果应该是来自于处理IO的设备,支持并行IO,即同一时刻,能够处理多个IO请求。反之,如果是CPU密集型,IO耗时忽略不计的话,此时多线程相当于是单线程,同时考虑到线程的上下文切换,那么多线程的运行时间反而会更多。

线程池的使用方法submit和map

python中concurrent.futures这个类提供了线程池和进程池的接口。as_completed按照任务的完成时间返回,map按照任务的添加时间返回

我们可以通过submit或map添加任务,但使用起来存在细微差别。

一般通过submit得到一个包含future对象的列表,然后通过concurrent.futures.as_completed去遍历这个列表,该方法会阻塞,可以设置超时时间。每当有任务完成的时候,就能通过future.result()得到任务执行的结果,该方法同样会阻塞,可以设置超时时间。因此通过这种方法,输出是按照任务执行完成的时间排序的。

当然,我们也可以不用as_completed去遍历,这样就按照任务的顺序返回。因为每个任务如果没完成就阻塞,完成了就添加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import concurrent.futures
import time


def task(times):
# 模拟任务执行
time.sleep(times)
return times

def main():
num_threads = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# 提交任务到线程池
# submit
futures = [executor.submit(task, t) for t in [7, 1, 3, 8]]
# # 收集每个任务的结果
results = []
# for future in concurrent.futures.as_completed(futures):
# result = future.result()
# results.append(result)
for future in futures:
result = future.result()
results.append(result)
# map
# results = []
# futures = executor.map(task, [7, 1, 3, 8])
# for future in futures:
# results.append(future)
print(f"results = {results}")


if __name__ == "__main__":
main()

上面两种submit,依次输出1 3 7 8和7 1 3 8

map函数则不太一样,第一个参数是需要线程执行的函数,第二个参数是一个迭代器,会依此将参数应用到线程函数中。返回结果和列表的顺序一样。返回7 1 3 8

多线程和多进程的对比

线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import concurrent.futures
import random
from threading import Lock
import time

# 共享变量
shared_variable = 0
# 锁对象,用于保护共享变量的访问
lock = Lock()

def task(task_id):
global shared_variable
# 模拟任务执行
# 获取锁,确保对共享变量的访问是线程安全的
for _ in range(1000000):
with lock:
shared_variable += 1
# shared_variable += 1


def main():
num_threads = 2

with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# 提交任务到线程池
futures = [executor.submit(task, i) for i in range(2)]

# 收集每个任务的结果
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
print(f"results = {results}")
print(f"shared_variable = {shared_variable}")

if __name__ == "__main__":
main()

注释掉上面代码的lock,试试加锁和不加锁,可以很清晰的看到,加锁的时候不会有竞争冒险,而不加锁则可能有竞争冒险。因为几率是比较小的,观察不到的话,可以加大循环的次数。

thread local的用法

现在有这样一个场景,我们知道全局变量是所有线程共享的,局部变量只在每个任务执行的时候存在。thread local就是给每个线程一个独立的变量,伴随线程整个生命周期,且线程间隔离。看用法就清楚了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from collections import defaultdict
import concurrent.futures
import time
import threading

# 利用local类,创建一个全局对象 local_obj
local_obj = threading.local()

def task(times):
if not hasattr(local_obj, 'var'): # 判断是否已经存在该属性
local_obj.var = 0
if not hasattr(local_obj, 'count'): # 判断是否已经存在该属性
local_obj.count = 0
for i in range(10):
local_obj.var += 1
local_obj.count += 1
print(f'线程id:{threading.get_ident()},thread-local数据:{local_obj.var},执行了{local_obj.count}次')

# 返回线程的 thread-local 数据
return {
'thread_id': threading.get_ident(),
'var': local_obj.var,
'count': local_obj.count
}

def main():
num_threads = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(task, t) for t in range(10)]

# 收集每个任务的结果
results = defaultdict(int)
for future in concurrent.futures.as_completed(futures):
result = future.result()
thread_id = result['thread_id']
thread_var = result['var']
if(results[thread_id]<thread_var):
results[thread_id] = thread_var
else:
continue

print(f"len(results)={len(results)},results = {results}")

if __name__ == "__main__":
main()

打印结果为

1
2
3
4
5
6
7
8
9
10
11
线程id:26396,thread-local数据:10,执行了1次
线程id:26396,thread-local数据:20,执行了2次
线程id:28596,thread-local数据:10,执行了1次
线程id:26396,thread-local数据:30,执行了3次
线程id:28596,thread-local数据:20,执行了2次
线程id:26396,thread-local数据:40,执行了4次
线程id:28596,thread-local数据:30,执行了3次
线程id:26396,thread-local数据:50,执行了5次
线程id:28596,thread-local数据:40,执行了4次
线程id:14512,thread-local数据:10,执行了1次
len(results)=3,results = defaultdict(<class 'int'>, {26396: 50, 28596: 40, 14512: 10})

使用thread local的好处是,避免多个线程操作共享变量的时候,频繁的上锁。

当然,其实在上面这种场景下,每个线程执行完了立马返回,然后在主线程收集结果,其实也是可行的,也不需要用锁。仔细体会一下~

多进程多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
import threading
import time

def print_numbers(lock):

global counter
results = []
for i in range(10000):
with lock:
results.append(i)
return results

def process_task(num):
results = []
counter = 0
time.sleep(5)
lock = threading.Lock()
print(f"lock = {lock}")
print(f"lock id = {id(lock)}") # 使用id()函数获取锁对象的内存地址
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(print_numbers,lock) for _ in range(8)]
for future in futures:
results.extend(future.result())
return results

def main():
t1 = time.time()
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(process_task, range(4))
final_results = [sublist for sublist in results ]
print(f"Final results (length: {len(final_results)})")
t2 = time.time()
# for line in final_results:
# print(line)
print(f"cost time = {t2-t1}")

if __name__ == "__main__":
main()

一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
import threading
import time

def print_numbers(lock):

global counter
results = []
for i in range(10000):
with lock:
results.append(i)
return results

def process_task(num):
results = []
counter = 0
time.sleep(5)
lock = threading.Lock()
print(f"lock = {lock}")
print(f"lock id = {id(lock)}") # 使用id()函数获取锁对象的内存地址
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(print_numbers,lock) for _ in range(8)]
for future in futures:
results.extend(future.result())
return results

def main():
t1 = time.time()
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(process_task, range(4))
final_results = [sublist for sublist in results ]
print(f"Final results (length: {len(final_results)})")
t2 = time.time()
# for line in final_results:
# print(line)
print(f"cost time = {t2-t1}")

if __name__ == "__main__":
main()

补充一个关于进程复制的例子

我们知道子进程会拷贝父进程内存空间,因此所有的父进程中的所有变量,子进程都会拷贝一份,彼此隔离。所以我们如果想用锁实现每个子进程内部线程的同步,那么我们只需要在主进程中创建一个线程锁就行了!然后每个子进程会复制一份,然后再在每个子进程的线程中,就像单进程多线程使用全局线程锁一样使用就行了!

当然,除了上面的方式,我们也可以不在主进程中创建线程锁,而是在每个进程函数中,创建每个进程唯一的线程锁,但是这样应该是需要把锁作为参数传递给子线程的?

之前疑惑的点,是怎么把锁传给每个进程,是因为我误解了fork函数,搞清楚fork会把父进程全部拷贝一份,就都懂了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
import multiprocessing

# 这是每个进程内部使用的线程锁
thread_lock = threading.Lock()
print(id(thread_lock))

def thread_task(thread_id):
with thread_lock:
print(id(thread_lock))
print(f"Thread {thread_id} in process {multiprocessing.current_process().name} is running")

def process_task(process_id):
# 每个进程创建多个线程
threads = []
for i in range(1): # 假设每个进程创建3个线程
thread = threading.Thread(target=thread_task, args=(i,))
threads.append(thread)
thread.start()

# 等待所有线程完成
for thread in threads:
thread.join()

if __name__ == "__main__":
processes = []

# 创建4个进程
for i in range(4):
process = multiprocessing.Process(target=process_task, args=(i,))
processes.append(process)
process.start()

# 等待所有进程完成
for process in processes:
process.join()