Python的多任务编程
# Python的多任务编程
# 前言
计算机是由CPU、RAM及各种资源(键盘、硬盘、显卡、网卡等)组成,代码的执行过程,实际就是CPU和相关寄存器及RAM之间的相关处理过程。一段代码交由CPU执行前,都会处于就绪队列中,CPU执行时很快就会返回该段代码的结果,所以不同进程的代码是轮流由CPU执行的,由于CPU执行速度很快,在表现上仍会被感觉是同时执行的。在单核CPU场景下,不同就绪队列间的读入与结果保存被称之为上下文切换,由于进程间切换会产生一定的时间等待及资源的消耗,所以为了减少时间和资源的消耗,就引入了线程的设计。线程是当进程的队列被授权占用CPU时,该进程的所有线程队列在共享该进程资源的条件下按优先级由CPU执行。无论是进程还是线程,其队列及资源切换都是由操作系统进行控制的,同时线程的切换也是非常耗性能的,为了使各线程的调度更节约资源和可控,就出现了协程的设计。协程其由进程或线程执行,其拥有自己的寄存器上下文和栈,调度是完全由用户控制的。对于多任务编程,若要实现代码的多任务高效率执行,我们要明晰如下这几个概念的特点及其区别,才能根据实际需求,选用最佳的多任务编程方法。
- 并行 指在同一时刻有多个进程的指令在多个处理器上同时执行。
- 并发 是指在同一时刻只能有一个进程的指令执行,但多个进程指令被快速轮换执行,使得在宏观上具有多个进程同时执行的效果。
- 进程 进程是程序的运行态,进程间数据共享需要借助外部存储。
- 线程 线程是进程的组成部分,同一进程内线程间数据共享属于内部共享,一个进程可以包含一个或多个线程。
- 协程 协程是一种用户态的轻量级线程,协程的调度完全由用户控制,同一进程内线程间数据共享属于内部共享。一个进程可以包含一个或多个协程,也可以在一个线程包含一个或多个协程。
# 多线程处理
由于Python是动态编译的语言,与C/C++、Java等静态语言不同,它是在运行时一句一句代码地边编译边执行的。用C语言实现的Python解释器,通常称为CPython,也是Python环境默认的编译器。在Cpython解释器中,为防止多个线程同时执行同一 Python 的代码段,确保线程数据安全,引入了全局解释器锁(GIL, Global Interpreter Lock), 相当于一个互斥锁,所以一个进程下开启的多线程,同一时刻只能有一个线程执行。所以Python 的多线程是伪线程,性能不高,同时也无法利用CPU多核的优势。
另,GIL并不是Python的特性,他是在实现Python解释器(Cpython)时所引入的一个概念,GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理。
在默认情况下,由于GIL的存在,为了使多线程(threading)执行效率更高,需要使用join方法对无序的线程进行阻塞,如下代码可以看到区别。
from multiprocessing import Process
import threading
import os,time
l=[]
stop=time.time()
def work():
global stop
time.sleep(2)
print('===>',threading.current_thread().name)
stop=time.time()
def test1():
for i in range(400):
p=threading.Thread(target=work,name="test"+str(i))
l.append(p)
p.start()
def test2():
for i in range(400):
p=threading.Thread(target=work,name="test"+str(i))
l.append(p)
p.start()
for p in l:
p.join()
if __name__ == '__main__':
print("CPU Core:",os.cpu_count()) #本机为4核
print("Worker: 400") #测试线程数
start=time.time()
test1()
active_count=threading.active_count()
while (active_count>1):
active_count=threading.active_count()
continue
test1_result=stop-start
start=time.time()
l=[]
test2()
active_count=threading.active_count()
while (active_count>1):
active_count=threading.active_count()
continue
print('Thread run time is %s' %(test1_result))
print('Thread join run time is %s' %(stop-start))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
执行结果如下:
Thread run time is 4.829492807388306
Thread join run time is 2.053645372390747
2
由上结果可以看到, 多线程时join阻塞后执行效率提高了很多。
# 多进程与多线程
多任务编程的本质是CPU占用方法的使用,对于python下多任务处理有多种编程方法可供选择,分别有多进程(multiprocessing)、多线程(threading)及异步协程(Asyncio),在实际使用中该如何选择呢?我们先看如下一段程序的执行效果。
from multiprocessing import Process
from threading import Thread
import os,time
l=[]
def work():
res=0
for i in range(100000000):
res*=i
def test1():
for i in range(4):
p=Process(target=work)
l.append(p)
p.start()
def test2():
for i in range(4):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
if __name__ == '__main__':
print("CPU Core:",os.cpu_count()) #本机为4核
print("Worker: 4") #工作线程或子进程数
start=time.time()
test1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
print('Process run time is %s' %(stop-start))
start=time.time()
l=[]
test2()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
print('Thread run time is %s' %(stop-start))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
执行结果如下:
CPU Core: 4
Worker: 4
Process run time is 11.030176877975464
Thread run time is 17.0117769241333
2
3
4
从上面的结果,我们可以看到同一个函数用Process及Thread 不同的方法,执行的时间是不同的,为什么会产生这样的差异? multiprocessing 使用子进程而非线程,其有效地绕过了全局解释器锁GIL(Global Interpreter Lock), 并充分利用了多核CPU的性能,所以在多核CPU环境下,其比多进程方式效率要高。
# 协程
又称为微线程,协程也可被看作是被标注的函数,不同被表注函数的执行和切换就是协程的切换,其完全由编程者自行控制。协程一般是使用 gevent库,在早期这个库用起来比较麻烦,所以在python 3.7以后的版本,对协程的使用方法做了优化。协程在进程调用场景下,仍可发挥CPU多核的优势。
import asyncio
import time
async def work(i):
await asyncio.sleep(2)
print('===>',i)
async def main():
start=time.time()
l=[]
for i in range(400):
p=asyncio.create_task(work(i))
l.append(p)
for p in l:
await p
stop=time.time()
print('run time is %s' %(stop-start))
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
执行结果如下:
run time is 2.0228068828582764
另,默认环境下,协程是在单线程模式下执行的异步操作,其并不能发挥多处理器的性能。为了提升执行效率,可以在多进程中执行协程调用方法,代码用例如下:
from multiprocessing import Process
import asyncio
import os,time
l=[]
async_result=0
async def work1():
res=0
for i in range(100000000):
res*=i
# 协程入口
async def async_test():
m=[]
for i in range(4):
p=asyncio.create_task(work1())
m.append(p)
for p in m:
await p
async def async_test1():
await asyncio.create_task(work1())
def async_run():
asyncio.run(async_test1())
# 多进程入口
def test1():
for i in range(4):
p=Process(target=async_run)
l.append(p)
p.start()
if __name__ == '__main__':
print("CPU Core:",os.cpu_count()) #本机为4核
print("Worker: 4") #工作线程或子进程数
start=time.time()
asyncio.run(async_test())
stop=time.time()
print('Asyncio run time is %s' %(stop-start))
start=time.time()
test1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
print('Process Asyncio run time is %s' %(stop-start))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
执行结果如下:
CPU Core: 4
Worker: 4
Asyncio run time is 18.89663052558899
Process Asyncio run time is 10.865562438964844
2
3
4
如上结果,在多进程中调用多协程的方法,执行效率明显提高。
# 并发编程选择
如上所结果是否是就决定一定要选择多进程(multiprocessing)模式呢?我们再看下如下代码:
from multiprocessing import Process
from threading import Thread
import os,time
l=[]
# 仅计算
def work1():
res=0
for i in range(100000000):
res*=i
# 仅输出
def work2():
time.sleep(2)
print('===>')
# 多进程,仅计算
def test1():
for i in range(4):
p=Process(target=work1)
l.append(p)
p.start()
# 多进程,仅输出
def test_1():
for i in range(400):
p=Process(target=work2)
l.append(p)
p.start()
# 多线程,仅计算
def test2():
for i in range(4):
p=Thread(target=work1)
l.append(p)
p.start()
for p in l:
p.join()
# 多线程,仅输出
def test_2():
for i in range(400):
p=Thread(target=work2)
l.append(p)
p.start()
for p in l:
p.join()
if __name__ == '__main__':
print("CPU Core:",os.cpu_count()) #本机为4核
start=time.time()
test1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test_result=stop-start
start=time.time()
l=[]
test_1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test1_result=stop-start
start=time.time()
l=[]
test2()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test2_result=stop-start
start=time.time()
l=[]
test_2()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test3_result=stop-start
print('Process run time is %s' %(test_result))
print('Process I/O run time is %s' %(test1_result))
print('Thread run time is %s' %(test2_result))
print('Thread I/O run time is %s' %(stop-start))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
执行结果如下:
Process run time is 10.77662968635559
Process I/O run time is 2.9869778156280518
Thread run time is 16.842355012893677
Thread I/O run time is 2.024587869644165
2
3
4
由结果可看,在仅计算的操作时,多进程效率比较高,在仅输出的操作时,多线程的效率比较高,所以在实际使用中要根据实际情况测试决定。
- 多线程(threading)用于IO密集型,如socket,爬虫,web
- 多进程(multiprocessing)用于计算密集型,如数据分析
# 参考
多线程 https://docs.python.org/zh-cn/3/library/threading.html
多进程 https://docs.python.org/zh-cn/3/library/multiprocessing.html#
协程 https://docs.python.org/zh-cn/3/library/asyncio-task.html
python 的多进程与多线程
# 由于python解释器的局限GIL(Global Interpreter Lock)的设计,在多核环境下,尽量用多进程
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(4):
p=Process(target=work) #耗时5s多
p=Thread(target=work) #耗时18s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
- 多线程
'''
# 线程参数
class threading.Thread(
group=None,
target=None, # 调用函数
name=None, # 定义线程名称
args=(), # 元组参数
kwargs={}, # 字典参数
*,
daemon=None # 主线程状态管理,若为True,主线程结束时,强制关闭当前线程
)
'''
import threading
import time
def test():
for i in range(5):
# 获取当前线程
print(threading.current_thread())
threading.current_thread().name="xuexi_python"
# 当前线程名称
print(threading.current_thread().getName())
print(threading.current_thread().name)
print('test ',i)
time.sleep(1)
# 声明线程方法
thread = threading.Thread(target=test)
# 启动线程
thread.start()
# 查看线程是否存活
thread.isAlive()
# 阻塞主线程,等待thread执行完成;
# 若指定阻塞时间,则表示等待thread执行X秒,释放阻塞一次
# 若线程是循环,则表示每执行循环一次产生结果后,释放阻塞一次
# 因GIL 的存在,所以需要使用,确保但进程中每次只有单线程执行
thread.join([timeout])
# 设置线程为主线程状态管理
thread.setDaemon(True)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
- 多线程变量共享 OK
# 线程之间共享全局变量
# 类调用多线程时,全局类变量是共享的
import threading
import time
g_nums = [1,2]
def test1(temp):
temp.append(33)
print("-----in test1 temp=%s-----"% str(temp))
def test2(temp):
print("-----in test2 temp=%s-----"% str(temp))
def watch(name, type):
print("watch : %s type:%s" % (name, type))
def main():
# 元组类型
t1 = threading.Thread(target=test1,args=(g_nums,))
t2 = threading.Thread(target=test2, args=(g_nums,)) # args 元组类型
t1.start()
time.sleep(1)
t2.start()
time.sleep(1)
print("-----in main temp=%s-----"% str(g_nums))
# 字典类型
watch_thread = threading.Thread(target=watch, kwargs={"name": "电影", "type": "科幻"})
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- 异步协程