目录
1 引言
2 创建进程
2.1 通过定义函数的方式创建进程
2.2 通过定义类的方式创建进程
3 Process中常用属性和方法
3.1 守护进程:daemon
3.2 进程终结于存活检查:terminate()与is_alive()
3.3 join()方法
4 进程间的同步机制
4.1进程锁:Lock
4.2 信号量: Semaphore
4.3 事件:Event
5 进程间的通信
5.1 队列:Queue
5.2 管道:Pipe
6 进程间的数据共享
6.1 进程间的数据隔离
6.2 Manager
7 进程池
8 总结
正文
1引言
本篇博文主要对Python中并发编程中的多进程相关内容展开详细介绍,Python进程主要在multiprocessing模块中,本博文以multiprocessing种Process类为中心,通过实例代码对多进程设计到的进程间的同步机制、通信机制、数据共享机制进程池进行介绍。
2创建进程
创建进程有两种方式,分别是通过定义函数的方式和通过定义类的方式。两种方式创建进程都必须通过实例化Process类。
Process类参数如下:
1) group:这一参数值始终为None,尚未启用,是为以后Python版本准备的
2) target:表示调用对象,即子进程要执行的任务
3) args:表示调用对象的位置参数元组,即target的位置参数,必须是元组,如args=(0,1,[1,2,3])
4) kwargs:表示调用对象的字典参数,kwargs={'name':'egon','age':18}
5) name:为子进程的名称
另外,无论用那种方式创建进程都必须有“if __name__ == '__main__':”这一行代码作为程序入口,否则会报错。
2.1通过定义函数的方式创建进程
通过函数方式创建进程这一方法需在实例化进程实例时将函数名作为参数传递进去,函数的参数用一个tuple传递给进程实例:
from multiprocessing import Process import time def func (n): print("子进程开始运行:{}……".format(n)) time.sleep(1) print("子进程结束运行:{}……".format(n)) if __name__ == '__main__':#创建进程执行一定要这一行代码 print("主进程开始运行……") p = Process(target=func,args=(10,)) # 注册 p.start()# 启动一个子进程 time.sleep(1) print("主进程结束运行……" )
2.2通过定义类的方式创建进程
用类的方式创建进程时,自定义的类必须必须继承Process类,且必须覆写run方法,所有功能代码都放在run方法中:
from multiprocessing import Process import os class MyProcess(Process): def run(self):#必须覆写run方法 print('子进程:',self.pid)if name=="main":
p1 = MyProcess()
p1.start()
p2 = MyProcess()
p2.start()
print('主进程:',os.getpid())
上面代码没有传递传输,如果要传递参数该怎么做呢?那就要自定义构造方法了,但是在构造方法中,一定要先调用Process类的构造方法:
from multiprocessing import Process import os class MyProcess(Process): def __init__(self,arg1,arg2): super().__init__() self.arg1 = arg1 self.arg2 = arg2 def run(self): print('子进程:',self.pid,self.arg1,self.arg2) if __name__=="__main__": p1 = MyProcess('我是arg1','我是arg2') p1.start() p2 = MyProcess('我是arg1','我是arg2') p2.start() print('主进程:',os.getpid())
3 Process的常用属性和方法
Process类常用属性如下:
1)daemon:默认值为False,如果设为True,则设为守护进程。
2)name:进程的名称
3)pid:进程的pid
Process类常用方法如下:
1)start():启动进程,并调用该子进程中的p.run() ;
2 )run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法;
3 )terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁;
4)is_alive():如果p仍然运行,返回True;
5)join([timeout]):主线程等待子进程终止
3.1守护进程:daemon
若要将一个进程设置为守护进程,在进程start之前,将其daemon属性设置为True即可。但什么是守护进程呢?我们通过如下代码来说明,我们要通过代码实现如下效果:主进程创建p1、p2进程,之后立马沉睡5秒,p1进程设置为守护进程,进程p1每隔1秒打印一条语句,进程p2打印一条语句后立马沉睡10秒。代码如下:
import time import os from multiprocessing import Process def func(): i = 1 while True: time.sleep(1) print('{}--子进程p1正在执行,pid:{}'.format(i,os.getpid())) i+=1 def func2(): print('子进程p2开始执行,pid:{}'.format(os.getpid())) time.sleep(10) print('子进程p2结束执行,pid:{}'.format(os.getpid())) if __name__=='__main__': print('主进程代码开始运行,pid:{}'.format(os.getpid())) p = Process(target=func) p.daemon = True # 设置为守护进程 p.start() p2 = Process(target=func2) p2.start() time.sleep(5) print('主进程代码运行完了,pid:{}'.format(os.getpid()))
运行结果:
主进程代码开始运行,pid:11608
子进程p2开始执行,pid:7260
1--子进程p1正在执行,pid:1060
2--子进程p1正在执行,pid:1060
3--子进程p1正在执行,pid:1060
4--子进程p1正在执行,pid:1060
主进程代码运行完了,pid:11608
子进程p2结束执行,pid:7260
从运行结果字面上似乎看不出什么,因为区别在于输出时间上。在主进程运行的那5秒时间(输出“主进程代码运行完了,pid:11608”之前),p1进程确实可以每隔1秒输出一条语句,但是主进程结束那5秒后,p1不在输出,且在任务管理器中也可以查看到,p1进程也已经死亡,主进程代码虽然运行完了,但依然存活,这时候p2进程依然还在沉睡,10秒后,p2进程打印“子进程p2结束执行,pid:7260”,然后主进程和p2进程一起死亡。
可以得出结果,守护进程依附于主进程代码,只要主进程代码运行完了,那么无论守护进程代码是否运行完,守护进程都会结束。另外,守护进程不能创建自己的子进程。
3.2 进程终结于存活检查:terminate()与is_alive()
terminate()与is_alive()都是由进程实例调用,分别用来终结一个进程、检查一个进程是否依然存活:
import time import os from multiprocessing import Processdef func():
i = 1
while True:
time.sleep(1)
print('{}--子进程p1正在执行,os.getpid()))
i+=1
if name=='main':
p = Process(target=func)
p.start()
time.sleep(3)
p.terminate() # 终结进程p
print(p.is_alive()) # 检查p是否依然存活
time.sleep(1)
print(p.is_alive())
输出结果:
主进程代码开始运行,pid:13164
1--子进程p1正在执行,pid:8896
2--子进程p1正在执行,pid:8896
True
False
主进程代码运行完了,pid:13164
为什么结束之后第一次调用is_alive()方法输出的是True呢? 因为terminate()方法终结一个进程时操作系统需要一定的响应时间,所以可能会有延迟。
3.3join()方法
join方法功能是阻塞当前所在进程(例如下面的主进程),等到被join的进程(下面的进程p1)结束之后,回到当前进程继续执行。
from multiprocessing import Process import time def func1 (): print("进程1开始运行……") for i in range(3): time.sleep(1) print("进程1运行了{}秒……".format(i+1)) print("进程1结束运行……")def func2 ():
print("进程2开始运行……")
for i in range(6):
time.sleep(1)
print("进程2运行了{}秒……".format(i+1))
print("进程2结束运行……")if name == 'main':
print("主进程开始运行……")
p1 = Process(target=func1)
p2 = Process(target=func2)
p1.start()
p2.start()
time.sleep(1)
p1.join()p2.join()
print("主进程结束运行……" )
上述代码不进行join、分别是对进程1、进程2进行join的运行结果,发现,主进程会等待被join的进程运行完才会继续执行join下面的代码。
4 进程间的同步控制
4.1 进程锁:Lock
当多个进程对同一资源进行IO操作时,需要对资源“上锁”,否则会出现意外结果。上锁之后,同一件就只能有一个进程运行上锁的代码块。例如有一个txt文件,里面内容是一个数字10,我们要用多进程去读取这个文件的值,然后每读一次,让txt中的这个数字减1,不加锁时代码如下:
from multiprocessing import Process from multiprocessing import Lock import time import os def func (): if os.path.exists('num.txt'): with open('num.txt','r') as f: num = int(f.read()) num -= 1 time.sleep(1) with open('num.txt','w') as f: f.write(str(num)) else: with open('num.txt','w') as f: f.write('10') if __name__ == '__main__': print("主进程开始运行……") p_list = [] for i in range(10): p = Process(target=func) p_list.append(p) p.start() for p in p_list: p.join() with open('num.txt','r') as f: num = int(f.read()) print('最后结果为:{}'.format(num)) print("主进程结束运行……" )
输出结果:
主进程开始运行……
进入万人交流基地:960410445 资源共享的偶尔!
最后结果为:8
主进程结束运行……
虽然我们用了10个进程读取并修改txt文件,但最后的值却不是1。这正是多进程共同访问资源造成混乱造成的。要达到预期结果,就要给资源上锁:
from multiprocessing import Process from multiprocessing import Lock import time import os def func (lock): if os.path.exists('num.txt'): lock.acquire() with open('num.txt','w') as f: f.write(str(num)) lock.release() else: with open('num.txt','w') as f: f.write('10') if __name__ == '__main__': print("主进程开始运行……") lock = Lock() p_list = [] for i in range(10): p = Process(target=func,args=(lock,)) p_list.append(p) p.start() for p in p_list: p.join() with open('num.txt','r') as f: num = int(f.read()) print('最后结果为:{}'.format(num)) print("主进程结束运行……" )
输出结果:
主进程开始运行……
最后结果为:0
主进程结束运行……
果然,用了进程锁之后获得了预料中的结果。但是,如果你运行了上面两块代码你就会发现,加了锁之后,程序明显变慢了很多,因为程序成了串行的了,当然好处是数据安全有保证。
4.2 信号量:Semaphore