Python的多任务编程

# Python的多任务编程

# 前言

计算机是由CPU、RAM及各种资源(键盘、硬盘、显卡、网卡等)组成,代码的执行过程,实际就是CPU和相关寄存器及RAM之间的相关处理过程。一段代码交由CPU执行前,都会处于就绪队列中,CPU执行时很快就会返回该段代码的结果,所以不同进程的代码是轮流由CPU执行的,由于CPU执行速度很快,在表现上仍会被感觉是同时执行的。在单核CPU场景下,不同就绪队列间的读入与结果保存被称之为上下文切换,由于进程间切换会产生一定的时间等待及资源的消耗,所以为了减少时间和资源的消耗,就引入了线程的设计。线程是当进程的队列被授权占用CPU时,该进程的所有线程队列在共享该进程资源的条件下按优先级由CPU执行。无论是进程还是线程,其队列及资源切换都是由操作系统进行控制的,同时线程的切换也是非常耗性能的,为了使各线程的调度更节约资源和可控,就出现了协程的设计。协程其由进程或线程执行,其拥有自己的寄存器上下文和栈,调度是完全由用户控制的。对于多任务编程,若要实现代码的多任务高效率执行,我们要明晰如下这几个概念的特点及其区别,才能根据实际需求,选用最佳的多任务编程方法。

  • 并行 指在同一时刻有多个进程的指令在多个处理器上同时执行。
  • 并发 是指在同一时刻只能有一个进程的指令执行,但多个进程指令被快速轮换执行,使得在宏观上具有多个进程同时执行的效果。
  • 进程 进程是程序的运行态,进程间数据共享需要借助外部存储。
  • 线程 线程是进程的组成部分,同一进程内线程间数据共享属于内部共享,一个进程可以包含一个或多个线程。
  • 协程 协程是一种用户态的轻量级线程,协程的调度完全由用户控制,同一进程内线程间数据共享属于内部共享。一个进程可以包含一个或多个协程,也可以在一个线程包含一个或多个协程。

# 多线程处理

由于Python是动态编译的语言,与C/C++、Java等静态语言不同,它是在运行时一句一句代码地边编译边执行的。用C语言实现的Python解释器,通常称为CPython,也是Python环境默认的编译器。在Cpython解释器中,为防止多个线程同时执行同一 Python 的代码段,确保线程数据安全,引入了全局解释器锁(GIL, Global Interpreter Lock), 相当于一个互斥锁,所以一个进程下开启的多线程,同一时刻只能有一个线程执行。所以Python 的多线程是伪线程,性能不高,同时也无法利用CPU多核的优势。

另,GIL并不是Python的特性,他是在实现Python解释器(Cpython)时所引入的一个概念,GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理。

在默认情况下,由于GIL的存在,为了使多线程(threading)执行效率更高,需要使用join方法对无序的线程进行阻塞,如下代码可以看到区别。

from multiprocessing import Process
import threading
import os,time

l=[]
stop=time.time()
def work():
    global stop
    time.sleep(2)
    print('===>',threading.current_thread().name)
    stop=time.time()

def test1():
    for i in range(400):
        p=threading.Thread(target=work,name="test"+str(i)) 
        l.append(p)
        p.start()

def test2():
    for i in range(400):
        p=threading.Thread(target=work,name="test"+str(i)) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为4核
    print("Worker: 400") #测试线程数

    start=time.time()
    test1()
    active_count=threading.active_count()
    while (active_count>1):
        active_count=threading.active_count()
        continue
    test1_result=stop-start

    start=time.time()
    l=[]
    test2()
    active_count=threading.active_count()
    while (active_count>1):
        active_count=threading.active_count()
        continue
    
    print('Thread run time is %s' %(test1_result))
    print('Thread join run time is %s' %(stop-start))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

执行结果如下:

Thread run time is 4.829492807388306
Thread join run time is 2.053645372390747
1
2

由上结果可以看到, 多线程时join阻塞后执行效率提高了很多。

# 多进程与多线程

多任务编程的本质是CPU占用方法的使用,对于python下多任务处理有多种编程方法可供选择,分别有多进程(multiprocessing)、多线程(threading)及异步协程(Asyncio),在实际使用中该如何选择呢?我们先看如下一段程序的执行效果。

from multiprocessing import Process
from threading import Thread
import os,time

l=[]
def work():
    res=0
    for i in range(100000000):
        res*=i

def test1():
    for i in range(4):
        p=Process(target=work) 
        l.append(p)
        p.start()

def test2():
    for i in range(4):
        p=Thread(target=work) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为4核
    print("Worker: 4") #工作线程或子进程数

    start=time.time()
    test1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    print('Process run time is %s' %(stop-start))
    
    start=time.time()
    l=[]
    test2()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    print('Thread run time is %s' %(stop-start))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

执行结果如下:

CPU Core: 4
Worker: 4
Process run time is 11.030176877975464
Thread run time is 17.0117769241333
1
2
3
4

从上面的结果,我们可以看到同一个函数用Process及Thread 不同的方法,执行的时间是不同的,为什么会产生这样的差异? multiprocessing 使用子进程而非线程,其有效地绕过了全局解释器锁GIL(Global Interpreter Lock), 并充分利用了多核CPU的性能,所以在多核CPU环境下,其比多进程方式效率要高。

# 协程

又称为微线程,协程也可被看作是被标注的函数,不同被表注函数的执行和切换就是协程的切换,其完全由编程者自行控制。协程一般是使用 gevent库,在早期这个库用起来比较麻烦,所以在python 3.7以后的版本,对协程的使用方法做了优化。协程在进程调用场景下,仍可发挥CPU多核的优势。

import asyncio
import time

async def work(i):
    await asyncio.sleep(2)
    print('===>',i)

async def main():
    start=time.time()
    l=[]
    for i in range(400):
        p=asyncio.create_task(work(i))
        l.append(p)

    for p in l:
        await p

    stop=time.time()
    print('run time is %s' %(stop-start))

asyncio.run(main())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

执行结果如下:

run time is 2.0228068828582764
1

另,默认环境下,协程是在单线程模式下执行的异步操作,其并不能发挥多处理器的性能。为了提升执行效率,可以在多进程中执行协程调用方法,代码用例如下:

from multiprocessing import Process
import asyncio
import os,time

l=[]
async_result=0
async def work1():
    res=0
    for i in range(100000000):
        res*=i

# 协程入口
async def async_test():
    m=[]
    for i in range(4):
        p=asyncio.create_task(work1())
        m.append(p)

    for p in m:
        await p

async def async_test1():
    await asyncio.create_task(work1())

def async_run():
    asyncio.run(async_test1())

# 多进程入口
def test1():
    for i in range(4):
        p=Process(target=async_run) 
        l.append(p)
        p.start()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为4核
    print("Worker: 4") #工作线程或子进程数
    start=time.time()
    asyncio.run(async_test())
    stop=time.time()
    
    print('Asyncio run time is %s' %(stop-start))

    start=time.time()
    test1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()

    print('Process Asyncio run time is %s' %(stop-start))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

执行结果如下:

CPU Core: 4
Worker: 4
Asyncio run time is 18.89663052558899
Process Asyncio run time is 10.865562438964844
1
2
3
4

如上结果,在多进程中调用多协程的方法,执行效率明显提高。

# 并发编程选择

如上所结果是否是就决定一定要选择多进程(multiprocessing)模式呢?我们再看下如下代码:

from multiprocessing import Process
from threading import Thread
import os,time

l=[]
# 仅计算
def work1():
    res=0
    for i in range(100000000):
        res*=i

# 仅输出
def work2():
    time.sleep(2)
    print('===>')

# 多进程,仅计算
def test1():
    for i in range(4):
        p=Process(target=work1) 
        l.append(p)
        p.start()

# 多进程,仅输出
def test_1():
    for i in range(400):
        p=Process(target=work2) 
        l.append(p)
        p.start()

# 多线程,仅计算
def test2():
    for i in range(4):
        p=Thread(target=work1) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

# 多线程,仅输出
def test_2():
    for i in range(400):
        p=Thread(target=work2) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为4核

    start=time.time()
    test1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test_result=stop-start
    
    start=time.time()
    l=[]
    test_1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test1_result=stop-start

    start=time.time()
    l=[]
    test2()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test2_result=stop-start
    start=time.time()
    l=[]
    test_2()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test3_result=stop-start
    print('Process run time is %s' %(test_result))
    print('Process I/O run time is %s' %(test1_result))
    print('Thread run time is %s' %(test2_result))
    print('Thread I/O run time is %s' %(stop-start))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

执行结果如下:

Process run time is 10.77662968635559
Process I/O run time is 2.9869778156280518
Thread run time is 16.842355012893677
Thread I/O run time is 2.024587869644165
1
2
3
4

由结果可看,在仅计算的操作时,多进程效率比较高,在仅输出的操作时,多线程的效率比较高,所以在实际使用中要根据实际情况测试决定。

  • 多线程(threading)用于IO密集型,如socket,爬虫,web
  • 多进程(multiprocessing)用于计算密集型,如数据分析

# 参考

  • 多线程 https://docs.python.org/zh-cn/3/library/threading.html

  • 多进程 https://docs.python.org/zh-cn/3/library/multiprocessing.html#

  • 协程 https://docs.python.org/zh-cn/3/library/asyncio-task.html

  • python 的多进程与多线程

# 由于python解释器的局限GIL(Global Interpreter Lock)的设计,在多核环境下,尽量用多进程


from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本机为4核
    start=time.time()
    for i in range(4):
        p=Process(target=work) #耗时5s多
        p=Thread(target=work) #耗时18s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • 多线程
'''
# 线程参数
class threading.Thread(
    group=None, 
    target=None, # 调用函数
    name=None, # 定义线程名称
    args=(), # 元组参数
    kwargs={}, # 字典参数
     *, 
    daemon=None # 主线程状态管理,若为True,主线程结束时,强制关闭当前线程
    )
'''

import threading
import time

def test():
    for i in range(5):
        # 获取当前线程
        print(threading.current_thread())
        threading.current_thread().name="xuexi_python"
        # 当前线程名称
        print(threading.current_thread().getName())
        print(threading.current_thread().name)
        print('test ',i)
        time.sleep(1)

# 声明线程方法
thread = threading.Thread(target=test)

# 启动线程
thread.start()

# 查看线程是否存活
thread.isAlive()

# 阻塞主线程,等待thread执行完成;
# 若指定阻塞时间,则表示等待thread执行X秒,释放阻塞一次
# 若线程是循环,则表示每执行循环一次产生结果后,释放阻塞一次
# 因GIL 的存在,所以需要使用,确保但进程中每次只有单线程执行
thread.join([timeout])

# 设置线程为主线程状态管理
thread.setDaemon(True)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  • 多线程变量共享 OK
# 线程之间共享全局变量
# 类调用多线程时,全局类变量是共享的
import threading
import time
g_nums = [1,2]
 
def test1(temp):
    temp.append(33)
    print("-----in test1 temp=%s-----"% str(temp))
 
def test2(temp):
    print("-----in test2 temp=%s-----"% str(temp))

def watch(name, type):
    print("watch : %s type:%s" % (name, type))
 
def main():
    # 元组类型
    t1 = threading.Thread(target=test1,args=(g_nums,))
    t2 = threading.Thread(target=test2, args=(g_nums,))  # args 元组类型
    t1.start()
    time.sleep(1)
    t2.start()
    time.sleep(1)    
    print("-----in main temp=%s-----"% str(g_nums))

    # 字典类型
    watch_thread = threading.Thread(target=watch, kwargs={"name": "电影", "type": "科幻"})

 
if __name__ == '__main__':
    main()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  • 异步协程
上次更新: 2022/12/05, 22:29:05

Initializing...

最近更新
01
git的tag与branch 原创
05-21
02
阿里云SLS日志服务的数据脱敏及安全管理 原创
03-21
03
云平台的成本管理 原创
03-13
更多文章>
×