python并行计算(上):pathos模块
由于python相当易学易用,现在python也较多地用于有大量的计算需求的任务。本文介绍几个并行模块,以及实现程序并行的入门技术。本文比较枯燥,主要是为后面上工程实例做铺垫。 本期介绍pathos模块 。
pathos模块
pathos是一个较为综合性的模块,既能多进程,也能多线程。其主要采用进程池/线程池方法。
pathos本身有一套进程池方法,同时也集成了multiprocess、pp模块的进程池方法。
1、pathos自身的多进程方法(pathos.multiprocessing.ProcessPool、pathos.multiprocessing.ProcessingPool、pathos.pools.ProcessPool)
(1)建立进程池
pathos.multiprocessing.ProcessPool(*args, **kwds) #建立pathos的进程池(pathos.multiprocessing.ProcessPool实例)。
pathos.multiprocessing.ProcessingPool(*args, **kwds) #同上。
pathos.pools.ProcessPool(*args, **kwds) #同上。
nodes:workers的数量。如果不指定nodes,则自动检测processors的数量(即ncpus)。
ncpus:worker processors的数量。
servers:worker servers的列表。
scheduler:相应的scheduler。
workdir:用于scratch calculations/files的$WORKDIR。
scatter:如为True,表示采用scatter-gatter(默认为worker-pool)。
source:如为False,表示尽可能少使用TemporaryFiles。
timeout:等待scheduler返回值的时间。
同样也有几个进程池通用的方法:
XXX.close() #关闭进程池,关闭后不能往pool中增加新的子进程,然后可以调用join()函数等待已有子进程执行完毕。XXX为进程池。
XXX.join() #等待进程池中的子进程执行完毕。需在close()函数后调用。XXX为进程池。
def f(a, b = value):
pool = pathos.multiprocessing.Pool()
pool.map(f, a_seq, b_seq)
pool.close()
pool.join()
(2)创建子进程
(a)单个子进程可通过pipe方法创建:
XXX.pipe(f, *args, **kwds) #采用 阻塞方式(非并行) 提交一个任务,阻塞直到返回结果为止。XXX为进程池实例。
XXX.apipe(f, *args, **kwds) # 异步(并行)提交一个任务到队列(queue)中,返回ApplyResult实例(其get方法可获得任务返回值,但get方法是阻塞的,应在所有子进程添加完后再调用)。 XXX为进程池实例。
f(*args,**kwds)为子进程对应的活动。
(b)如果子进程有返回值,且返回值需要集中处理,则建议采用map方式(子进程活动允许多个参数) :
XXX.map (f, *args, **kwds) #采用阻塞方式按顺序运行一批任务,返回结果组成的list。func(iterable1[i], iterable2[i], ...)为子进程对应的活动。XXX为进程池实例。
XXX.amap(f, *args, **kwds) # XXX .map ()的异步(并行)版本 ,返回MapResult实例(其具有get()方法,获取结果组成的list)。XXX为进程池实例。
def f(a, b): #map方法允许多个参数
pool = pathos.multiprocessing.Pool()
result = pool.map_async(f, (a0, a1, ...), (b0, b1, ...)).get()
pool.close()
pool.join()
(c)如果内存不够用,也可采用imap迭代器方式 :
XXX.imap(f, *args, **kwds) # XXX.map ()的非阻塞、按顺序迭代器版本,返回迭代器实例。XXX为进程池实例。
XXX.uimap(f, *args, **kwds) #XXX.imap()的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例。XXX为进程池实例。
def f(a, b):
pool = pathos.multiprocessing.Pool()
result = pool.uimap(f, a_seq, b_seq)
pool.close()
pool.join()
for item in result:
pass
2、映射multiprocess模块的多进程方法(pathos.multiprocessing.Pool)
(1)建立进程池
pathos.multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None) #建立multiprocess的进程池。
processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
initializer: 如果initializer不是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocess.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
(2)创建子进程
该进程池对应的创建子进程方法与multiprocess.Pool()(也即multiprocessing.Pool())完全相同。
3、映射pp模块的多进程方法1(pathos.pools.ParallelPool、pathos.pp.ParallelPool、pathos.pp.ParallelPythonPool、pathos.parallel.ParallelPythonPool、pathos.parallel.ParallelPool)
(1)建立进程池
pathos.pp.ParallelPool(*args, **kwds) #建立映射pp模块方法的进程池,返回pathos.parallel.ParallelPool实例。注意, 建立的进程池的方法与pp模块完全不同 。
pathos.pp.ParallelPythonPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
pathos.pools.ParallelPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
pathos.parallel.ParallelPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
pathos.parallel.ParallelPythonPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
nodes:workers的数量。如果不指定nodes,则自动检测processors的数量(即ncpus)。
ncpus:worker processors的数量。