添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
concurrent.futures in python by ThreadPoolExecutor&&ProcessPoolExecutor

concurrent.futures in python by ThreadPoolExecutor&&ProcessPoolExecutor

Executor 对象

class concurrent.futures.Executor

抽象类提供异步执行调用方法。是两种线程池的父类,要通过它的子类调用,而不是直接调用。

submit(fn, /, *args, **kwargs)

返回一个Future对象

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

iterables 是立即执行而不是延迟执行的;

func 是异步执行的,对 func 的多个调用可以并发执行。

The returned iterator raises a TimeoutError if __next__() is called and the result isn't available after timeout seconds from the original call to Executor.map() . timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time.


如果 func 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。


使用 ProcessPoolExecutor 时,这个方法会将 iterables 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 chunksize 指定正整数设置。 对很长的迭代器来说,使用大的 chunksize 值比默认值 1 能显著地提高性能。 chunksize 对 ThreadPoolExecutor 没有效果。(因为python有GIL锁,多线程就是单线程运行和协程差不多。)


shutdown(wait=True, *, cancel_futures=False)

(shutdown可以晚点看

当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。 在关闭后调用 Executor.submit() Executor.map() 将会引发 RuntimeError


如果 wait 为 True 则此方法只有在所有待执行的 future 对象完成执行且释放已分配的资源后才会返回。 如果 wait 为 False,方法立即返回,所有待执行的 future 对象完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。


如果 cancel_futures 为 True,此方法将取消所有执行器还未开始运行的挂起的 Future。 任何已完成或正在运行的 Future 将不会被取消,无论 cancel_futures 的值是什么?


如果 cancel_futures 和 wait 均为 True,则执行器已开始运行的所有 Future 将在此方法返回之前完成。 其余的 Future 会被取消。


如果使用 with 语句,你就可以避免显式调用这个方法,它将会停止 Executor (就好像 Executor.shutdown() 调用时 wait 设为 True 一样等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
  

ThreadPoolExecutor

当回调已关联了一个 Future 然后再等待另一个 Future 的结果时就会发产死锁情况。例如:

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Executor 子类使用最多 max_workers 个线程的线程池来异步执行调用。

initializer 是在每个工作者线程开始处调用的一个可选可调用对象。 initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenThreadPool (一种error。

如果 max_workers 为 None 或没有指定,将默认为机器处理器的个数,假如 ThreadPoolExecutor 则重于I/O操作而不是CPU运算,那么可以乘以 5。

3.6 新版功能: 添加 thread_name_prefix 参数允许用户控制由线程池创建的 threading.Thread 工作线程名称以方便调试。(这tm什么意思啊
在 3.8 版更改: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。 这个默认值会保留至少 5 个工作线程用于 I/O 密集型任务。 对于那些释放了 GIL 的 CPU 密集型任务,它最多会使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。
现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。

ThreadPoolExecutor 例子

import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过 全局解释器锁 但也意味着只可以处理和返回可封存的对象。

__main__ 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中。

从可调用对象中调用 Executor Future 的方法提交给 ProcessPoolExecutor 会导致死锁。(这是不可以线程池套进程池吗?有懂的说一下吗?

classconcurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

如果 max_workers 小于等于 0,则将引发 ValueError 。 在 Windows 上,max_workers 必须小于等于 61,否则将引发 ValueError 。 如果 max_workers 为 None,则所选择的默认值最多为 61。 mp_context 可以是一个多进程上下文或是 None。 它将被用来启动工作进程。 如果 mp_context 为 None 或未给出,则将使用默认的多进程上下文。

initializer 是一个可选的可调用对象,它会在每个工作进程启动时被调用;initargs 是传给 initializer 的参数元组。 如果 initializer 引发了异常,则所有当前在等待的任务以及任何向进程池提交更多任务的尝试都将引发 BrokenProcessPool

max_tasks_per_child 是一个可选参数,指定单个进程在退出并替换为新工作进程之前可以执行的最大任务数。默认情况下,每个子进程的max\u tasks\u为None,这意味着工作进程将与池一样长。当指定最大值时,默认情况下当指定最大值时,默认情况下,在没有mp_context参数的情况下,将使用“spawn”多处理启动方法。此功能与“fork”启动方法不兼容。

在 3.3 版更改: 如果其中一个工作进程被突然终止,BrokenProcessPool 就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。

在 3.7 版更改: 添加 mp_context 参数允许用户控制由进程池创建给工作者进程的开始方法 。
加入 initializer 和*initargs* 参数。

在 3.11 版更改: max_tasks_per_child 参数被添加以允许用户控制生命周期属于工作者在于这个池种的

ProcessPoolExecutor 例子

import concurrent.futures
import math
PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
def main():