添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

【python】详解multiprocessing多进程-process模块(一)
【python】详解multiprocessing多进程-Pool进程池模块(二)
【python】详解multiprocessing多进程-Queue、Pipe进程间通信(三)
【python】详解multiprocessing多进程-Lock、Rlock进程同步(四)
【python】详解multiprocessing多进程-总结(五)

Multiprocessing.Pool 可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数;
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
1.1、apply(func [,args [,kwds ] ] )

       使用参数args和关键字参数kwds调用func。它会阻塞,直到结果准备就绪。鉴于此块,更适合并行执行工作。此外,func 仅在池中的一个工作程序中执行。

from multiprocessing import Pool
import time
def test(p):
       print(p)
       time.sleep(3)
if __name__=="__main__":
    pool = Pool(processes=10)
    for i  in range(500):
        ('\n'
         '	(1)遍历500个可迭代对象,往进程池放一个子进程\n'
         '	(2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)\n'
         '	 for循环执行完毕,再执行print函数。\n'
        pool.apply(test, args=(i,))   #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
    print('test')
    pool.close()
    pool.join()
Process finished with exit code -1

       for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程……等500个子进程都执行完了,再执行print。(从结果来看,并没有多进程并发)

1.2、apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] )

       异步进程池(非阻塞),返回结果对象的方法的变体。如果指定了回调,则它应该是可调用的,它接受单个参数。当结果变为就绪时,将对其应用回调,即除非调用失败,在这种情况下将应用error_callback。如果指定了error_callback,那么它应该是一个可调用的,它接受一个参数。如果目标函数失败,则使用异常实例调用error_callback。回调应立即完成,否则处理结果的线程将被阻止。

from multiprocessing import Pool
import time
def test(p):
       print(p)
       time.sleep(3)
if __name__=="__main__":
    pool = Pool(processes=2)
    for i  in range(500):
         (1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)\n'
         (2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)\n'
        pool.apply_async(test, args=(i,))   #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
    print('test')
    pool.close()
    pool.join()
Process finished with exit code -1

       调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。

1.3、map(func,iterable [,chunksize ] )

       map()内置函数的并行等价物(尽管它只支持一个可迭代的参数)。它会阻塞,直到结果准备就绪。此方法将iterable内的每一个对象作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。

from multiprocessing import Pool
def test(i):
    print(i)
if  __name__ == "__main__":
    lists = [1, 2, 3]
    pool = Pool(processes=2)       #定义最大的进程数
    pool.map(test, lists)          #p必须是一个可迭代变量。
    pool.close()
    pool.join()
1.4、map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ] )

       map()返回结果对象的方法的变体。需要传入可迭代对象iterable

from multiprocessing import Pool
import time
def test(p):
       print(p)
       time.sleep(3)
if __name__=="__main__":
    pool = Pool(processes=2)
    # for i  in range(500):
    #     '''
    #      (1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)\n'
    #      (2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)\n'
    #     '''
    #     pool.apply_async(test, args=(i,))   #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
    pool.map_async(test, range(500))
    print('test')
    pool.close()
    pool.join()
Process finished with exit code -1
1.5、imap(func,iterable [,chunksize ] )

       返回迭代器,next()调用返回的迭代器的方法得到结果,imap()方法有一个可选的超时参数: next(timeout)将提高multiprocessing.TimeoutError如果结果不能内退回超时秒。

1.6、close()

       防止任何更多的任务被提交到池中。 一旦完成所有任务,工作进程将退出。

1.7、terminate()

       立即停止工作进程而不完成未完成的工作。当池对象被垃圾收集时,terminate()将立即调用。

1.8、join()

       等待工作进程退出。必须打电话close()或 terminate()使用之前join()。

from multiprocessing import Pool
import time
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow
        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"
        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow
        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Traceback (most recent call last):
  File "C:/Users/BruceWong/Desktop/develop/multiprocessingpool.py", line 19, in <module>
    print(next(res))
TypeError: 'MapResult' object is not an iterator
Process finished with exit code 1
                            
                            
				
目录1. Python使用multiprocessing.Pool实现固定数量线程 1. Python使用multiprocessing.Pool实现固定数量线程 from multiprocessing import Pool, cpu_count import time import os def thread_task(number): print("线程id为: %d, 处理的任务为:%d, 线程处理【开始】" % (os.getpid(), number)) time.s
一、Pool类介绍 在之前的博客中有对并行和并发进行了介绍。在python种主要存在两种方法实行:多线程和多进程。 对于python来说,多线程实际上是并发的,并没有完全利用多核的优势。当然这也要看具体的需求,如果是计算密集型的,多采用并行的方法;如果是IO密集型的,多采用并发的方法。这要是考虑到互相切换的开销。 解下来,我们介绍一种多进程的方法:Pool类。Pool可以提供指定数量的进程,供用户调用。当有新的请求提交到Pool中时,如果Pool还没有满,此时
mapmap_async 可以并发执行任务。applyapply_async 一次只能执行一个任务,但 apply_async 可以异步执行,因而也可以实现并发。 一、单次执行 1、单次同步执行 一个任务执行完再进行下一个任务 import multiprocessing import time def func(msg): print("msg:", msg) time.sleep(2) print("end") if __name__ == "__m
multiprocessing模块Pool简单来说就是用来创建多进程的。1.applyapply_asyncapply方法会阻塞父进程,而且进程中的进程是一个接一个执行的,并没有并行工作;如果想要进程中的进程并行工作,可以使用apply_async方法。先看apply的例子:# encoding=utf8 from multiprocessing import Pool import t...
原文地址:https://www.jeremyjone.com/420/ , 转载请注明 很久没有用到进程,今天公司项目需要大量进程,考虑使用进程操作。其实很简单,几行代码就可以搞定,但是遇到了一个比较有意思的问题。之前写Python都是在Linux上,没有出现过,今天发现Windows上还是有一些区别。 我以为很简单,导包,创建,使用,结束。五行搞定。 from multiprocessin...
一.多进程 当计算机运行程序时,就会创建包含代码和状态的进程。这些进程会通过计算机的一个或多个CPU执行。不过,同一时刻每个CPU只会执行一个进程,然后不同进程间快速切换,给我们一种错觉,感觉好像多个程序在同时进行。例如:有一个大型工厂,该工厂负责生产电脑,工厂有很多的车间用来生产不同的电脑部件。每个车间又有很多工人互相合作共享资源来生产某个电脑部件。这里的工厂相当于一个爬虫工程,每个车间相当于一个进程,每个工人就相当于线程。线程是CPU调度的基本单元。 也就是进程间是独立的,这表现在内存空间,上下文环
pool = Pool(processes=2) for i in range(10): # pool.apply(test, args=(i,)) # 同步调用,每次只取一个 pool.apply
由于 python的多线程不能使用多核cpu,只能使用多进程。 在工作中遇到了需要处理几百万的数据条,查阅了相关资料发现使用多进程进程功能能够很好的解决问题。 进程有两个调用执行代码的接口,分别是mapapply_asyncmap所限于不能调用执行代码有过个参数的情况,因此主要使用apply_async。 在使用过程中不能将执行代码写在类里面。 def log_result(re
```java public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { // 1、创建一个异步操作:无返回值 public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFutur Pool 模块来自于 multiprocessing 模块multiprocessing 模块是跨平台版本的多进程模块,像线程一样管理进程,与 threading 很相似,对多核CPU的利用率会比 threading 好的多。 Pool 类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果还没有满,就会创建一个新的进程来执行请求。如果满,请求就会告知先等待,直到...