添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
我们在日常开发中可能要完成一些计算密集型(CPU密集型)的任务,如计算圆周率、对视频进行高清解码等等,这些任务的特点是需要进行大量的计算,消耗CPU资源。因此,想要最高效率的利用CPU,我们可以使用多进程来实现,让我们一起来探讨在Python中是如何实现多进程的吧。

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器。

进程的概念

进程是一个实体,每一个进程都有自己的内存地址。
进程一般由 程序、数据集、进程控制块 三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时所需要的数据和工作区;程序控制块包含进程的描述信息和控制信息,是进程存在的唯一标志。

进程具有的特征
动态性:进程是程序的一次执行过程,是临时的,有生命周期的,是动态生成、动态消亡的;
并发性:任何进程都可以同其他进程一起并发执行;
独立性:进程是系统进行资源分配和调度的一个独立单位;
结构性:进程由程序、数据和进程控制块三部分组成

Python多进程

大部分情况下,想要充分使用多核CPU的资源,就需要在代码中使用多进程。Python提供了multiprocessing模块来启动子进程,并在子进程中执行我们定制的任务。multiprocessing模块的功能众多:支持子进程、通信、数据共享和执行不同形式的同步,提供了Process、Queue、Pipe等组件。 进程修改的数据仅限于该进程内

Process类介绍与应用

Process(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None)

参数说明:

  • group参数未使用,值始终为None
  • target表示调用的对象,即子进程要执行的任务
  • name为子进程的名称
  • args表示调用对象的位置参数元组,必须有逗号,如:args=(1,)
  • kwargs表示调用对象的字典,kwargs=
  • daemon表示是否守护进程
  • 下面介绍一下python多进程的开启和调用

    import random
    from time import sleep
    from multiprocessing import Process
    def func(name):
        s = random.randint(1, 5)
        print(f'current process is {name}, sleeping {s}s.')
        sleep(s)
        print(f'process {name} is over')
    if __name__ == '__main__':
        for i in range(1, 5):
            p = Process(target=func, args=(i,))
            p.start()
        print('main process')
    
    main process
    current process is 1, sleeping 3s.
    current process is 2, sleeping 1s.
    current process is 3, sleeping 2s.
    current process is 4, sleeping 5s.
    process 2 is over
    process 3 is over
    process 1 is over
    process 4 is over
    

    可以看出,各进程之间是并发执行的,先完成任务的进程先结束。

    import random
    from time import sleep
    from multiprocessing import Process
    class Func(Process):
        def __init__(self, name):
            super().__init__()
            self.name = name
        def run(self):
            s = random.randint(1, 5)
            print(f'current process is {self.name}, sleeping {s}s.')
            sleep(s)
            print(f'process {self.name} is over')
    if __name__ == '__main__':
        for i in range(1, 5):
            p = Func(str(i))
            p.start()
        print('main process')
    
    main process
    current process is 1, sleeping 3s.
    current process is 2, sleeping 1s.
    current process is 3, sleeping 4s.
    current process is 4, sleeping 3s.
    process 2 is over
    process 1 is over
    process 4 is over
    process 3 is over
    

    继承调用中,start()方法会启动一个子线程,在该子线程中执行run()方法来实现进程的调用。

    Process的join方法

    在多进程中,主进程和子进程的执行是同时进行的,如果主进程中有些任务需要等待子进程执行完毕后再执行的话,就需要使用join()方法。

    作用:在进程中可以阻塞主进程的执行,直到等待子进程全部完成后,才继续运行主进程后面的代码。

    我们在直接调用的例子中加入join

    import random
    from time import sleep
    from multiprocessing import Process
    def func(name):
        s = random.randint(1, 5)
        print(f'current process is {name}, sleeping {s}s.')
        sleep(s)
        print(f'process {name} is over')
    if __name__ == '__main__':
        plist = []
        for i in range(1, 5):
            p = Process(target=func, args=(i,))
            p.start()  # 启动子进程
            plist.append(p)
        for p in plist:
            p.join()  # 阻塞主进程
        print('main process')
        print('do something')
    
    current process is 1, sleeping 5s.
    current process is 2, sleeping 4s.
    current process is 3, sleeping 5s.
    current process is 4, sleeping 4s.
    process 2 is over
    process 4 is over
    process 1 is over
    process 3 is over
    main process
    do something
    

    可以看出,join阻塞了主进程的执行,主进程的打印操作在子进程全部执行完毕后才执行。

    主进程在开启子进程的时候,可以将子进程设置为守护进程

    守护进程的两个特点

  • 守护进程会在主进程结束后就会终止(不管守护进程的任务是否执行完毕)
  • 守护进程内无法开启子进程,否则抛出异常
  • import random
    from time import sleep
    from multiprocessing import Process
    def func(name):
        s = random.randint(1, 5)
        print(f'current process is {name}, sleeping {s}s.')
        sleep(s)
        print(f'process {name} is over')
    if __name__ == '__main__':
        p1 = Process(target=func, args=('daemon',))
        p2 = Process(target=func, args=('sub',))
        p1.daemon = True  # 设置p1为守护进程
        p1.start()  # 启动守护进程
        p2.start()  # 启动子进程
        print('main process')
        sleep(2) # 主进程设置两秒睡眠时间
    
    main process
    current process is daemon, sleeping 5s.
    current process is sub, sleeping 1s.
    process sub is over
    

    上面例子中启动了两个子进程,其中一个子进程p1设置成了守护进程。从结果我们可以看出,主进程结束后(2秒),守护进程(5秒)也跟随主进程结束,并没有打印出后续的内容。

    Python进程池

    当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

    Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    Pool常用参数说明如下

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用
  • 请看下面例子

    import random
    from time import sleep
    from multiprocessing import Pool
    def func(name):
        s = random.randint(1, 5)
        print(f'current process is {name}, sleeping {s}s.')
        sleep(s)
        print(f'process {name} is over')
    if __name__ == '__main__':
        p = Pool(3)
        for i in range(1, 5):
            p.apply_async(func, (i,))
        p.close()
        p.join()
        print('main process')
    
    current process is 1, sleeping 1s.
    current process is 2, sleeping 5s.
    current process is 3, sleeping 4s.
    process 1 is over
    current process is 4, sleeping 1s.
    process 4 is over
    process 3 is over
    process 2 is over
    main process
    

    我们设置进程池的最大值为3,循环的时候先将3个进程放进池子中,等池子中的进程执行结束后(process 1),会将新的进程(process 4)放进池子中来执行。