我们在日常开发中可能要完成一些计算密集型(CPU密集型)的任务,如计算圆周率、对视频进行高清解码等等,这些任务的特点是需要进行大量的计算,消耗CPU资源。因此,想要最高效率的利用CPU,我们可以使用多进程来实现,让我们一起来探讨在Python中是如何实现多进程的吧。
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器。
进程的概念
进程是一个实体,每一个进程都有自己的内存地址。
进程一般由
程序、数据集、进程控制块
三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时所需要的数据和工作区;程序控制块包含进程的描述信息和控制信息,是进程存在的唯一标志。
进程具有的特征
动态性:进程是程序的一次执行过程,是临时的,有生命周期的,是动态生成、动态消亡的;
并发性:任何进程都可以同其他进程一起并发执行;
独立性:进程是系统进行资源分配和调度的一个独立单位;
结构性:进程由程序、数据和进程控制块三部分组成
Python多进程
大部分情况下,想要充分使用多核CPU的资源,就需要在代码中使用多进程。Python提供了multiprocessing模块来启动子进程,并在子进程中执行我们定制的任务。multiprocessing模块的功能众多:支持子进程、通信、数据共享和执行不同形式的同步,提供了Process、Queue、Pipe等组件。
进程修改的数据仅限于该进程内
。
Process类介绍与应用
Process(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None)
参数说明:
group参数未使用,值始终为None
target表示调用的对象,即子进程要执行的任务
name为子进程的名称
args表示调用对象的位置参数元组,必须有逗号,如:args=(1,)
kwargs表示调用对象的字典,kwargs=
daemon表示是否守护进程
下面介绍一下python多进程的开启和调用
import random
from time import sleep
from multiprocessing import Process
def func(name):
s = random.randint(1, 5)
print(f'current process is {name}, sleeping {s}s.')
sleep(s)
print(f'process {name} is over')
if __name__ == '__main__':
for i in range(1, 5):
p = Process(target=func, args=(i,))
p.start()
print('main process')
main process
current process is 1, sleeping 3s.
current process is 2, sleeping 1s.
current process is 3, sleeping 2s.
current process is 4, sleeping 5s.
process 2 is over
process 3 is over
process 1 is over
process 4 is over
可以看出,各进程之间是并发执行的,先完成任务的进程先结束。
import random
from time import sleep
from multiprocessing import Process
class Func(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
s = random.randint(1, 5)
print(f'current process is {self.name}, sleeping {s}s.')
sleep(s)
print(f'process {self.name} is over')
if __name__ == '__main__':
for i in range(1, 5):
p = Func(str(i))
p.start()
print('main process')
main process
current process is 1, sleeping 3s.
current process is 2, sleeping 1s.
current process is 3, sleeping 4s.
current process is 4, sleeping 3s.
process 2 is over
process 1 is over
process 4 is over
process 3 is over
继承调用中,start()方法会启动一个子线程,在该子线程中执行run()方法来实现进程的调用。
Process的join方法
在多进程中,主进程和子进程的执行是同时进行的,如果主进程中有些任务需要等待子进程执行完毕后再执行的话,就需要使用join()方法。
作用:在进程中可以阻塞主进程的执行,直到等待子进程全部完成后,才继续运行主进程后面的代码。
我们在直接调用的例子中加入join
import random
from time import sleep
from multiprocessing import Process
def func(name):
s = random.randint(1, 5)
print(f'current process is {name}, sleeping {s}s.')
sleep(s)
print(f'process {name} is over')
if __name__ == '__main__':
plist = []
for i in range(1, 5):
p = Process(target=func, args=(i,))
p.start() # 启动子进程
plist.append(p)
for p in plist:
p.join() # 阻塞主进程
print('main process')
print('do something')
current process is 1, sleeping 5s.
current process is 2, sleeping 4s.
current process is 3, sleeping 5s.
current process is 4, sleeping 4s.
process 2 is over
process 4 is over
process 1 is over
process 3 is over
main process
do something
可以看出,join阻塞了主进程的执行,主进程的打印操作在子进程全部执行完毕后才执行。
主进程在开启子进程的时候,可以将子进程设置为守护进程
守护进程的两个特点
守护进程会在主进程结束后就会终止(不管守护进程的任务是否执行完毕)
守护进程内无法开启子进程,否则抛出异常
import random
from time import sleep
from multiprocessing import Process
def func(name):
s = random.randint(1, 5)
print(f'current process is {name}, sleeping {s}s.')
sleep(s)
print(f'process {name} is over')
if __name__ == '__main__':
p1 = Process(target=func, args=('daemon',))
p2 = Process(target=func, args=('sub',))
p1.daemon = True # 设置p1为守护进程
p1.start() # 启动守护进程
p2.start() # 启动子进程
print('main process')
sleep(2) # 主进程设置两秒睡眠时间
main process
current process is daemon, sleeping 5s.
current process is sub, sleeping 1s.
process sub is over
上面例子中启动了两个子进程,其中一个子进程p1设置成了守护进程。从结果我们可以看出,主进程结束后(2秒),守护进程(5秒)也跟随主进程结束,并没有打印出后续的内容。
Python进程池
当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
Pool常用参数说明如下
apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
close():关闭Pool,使其不再接受新的任务;
terminate():不管任务是否完成,立即终止;
join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用
请看下面例子
import random
from time import sleep
from multiprocessing import Pool
def func(name):
s = random.randint(1, 5)
print(f'current process is {name}, sleeping {s}s.')
sleep(s)
print(f'process {name} is over')
if __name__ == '__main__':
p = Pool(3)
for i in range(1, 5):
p.apply_async(func, (i,))
p.close()
p.join()
print('main process')
current process is 1, sleeping 1s.
current process is 2, sleeping 5s.
current process is 3, sleeping 4s.
process 1 is over
current process is 4, sleeping 1s.
process 4 is over
process 3 is over
process 2 is over
main process
我们设置进程池的最大值为3,循环的时候先将3个进程放进池子中,等池子中的进程执行结束后(process 1),会将新的进程(process 4)放进池子中来执行。