添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

向concurrent.futures.Executor.map传递多个参数?

93 人关注

The concurrent.futures.Executor.map 需要一个可变数量的迭代器,从中调用所给的函数。 如果我有一个生成器,产生的图元通常是就地解包的,我应该如何调用它?

下面的方法不起作用,因为每个生成的图元都作为不同的参数给了map。

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):

如果没有生成器,映射所需的参数可能看起来像这样。

executor.map(
    (i[0] for i in args),
    (i[1] for i in args),
    (i[N] for i in args),
    
2 个评论
vz0
我不明白你想要什么。在你最新的编辑中,没有发电机的例子不起作用,因为发电机上的每个元素只有两个,N的值是多少?
@vz0: N是由 args 生成的图元中的项目数。
python
concurrency
iterator
future
map-function
Matt Joiner
Matt Joiner
发布于 2011-07-22
9 个回答
agf
agf
发布于 2022-01-29
已采纳
0 人赞同

One argument that is repeated, one argument in c

from itertools import repeat
for result in executor.map(f, repeat(a), c):

Need to unpack items of c, and can unpack c

from itertools import izip
for result in executor.map(f, *izip(*c)):

Need to unpack items of c, can't unpack c

  • Change f to take a single argument and unpack the argument in the function.
  • 如果c中的每个项目都有可变数量的成员,或者你只调用f几次。

    executor.map(lambda args, f=f: f(*args), c)
    

    它定义了一个新的函数,将每个项目从c中解包出来,并调用f。在lambda中为f使用一个默认参数,使得flambda中成为本地的,因此减少了查找时间。

  • 如果你有固定数量的参数,而你需要多次调用f

    from collections import deque
    def itemtee(iterable, n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = items.popleft
            extend = items.extend
            while True:
                if not items:
                    extend(next(it))
                yield popleft()
        return [gen()] * n
    executor.map(f, *itemtee(c, n))
    

    其中nf的参数数。这是由itertools.tee.

  • 重复是有用的,但我的例子与问题不同。我已经试图改进它。对此我很抱歉。
    是的,这个zip解压工作,但在解压参数到zip时,整个生成器的内容都被消耗了。lambda还有一个好处,即不是每次调用map函数都必须有精确的相同数量的参数(不是说这是一个要求)。
    这是其中较小的问题,更大的问题是必须处理整个发电机。
    不不,我想把每个生成的项目作为 f 的参数来解包。 for p in args: f(*p) 。对不起,这很难解释 :( )。
    我写了一个类似的 itertools.tee 的替代形式,但发现@vzo的lambda是一个更简单的解决方案。愿意解释一下为什么lambda形式有很高的开销吗?
    vz0
    vz0
    发布于 2022-01-29
    0 人赞同

    你需要删除 * 上的 map 调用。

    args = ((a, b) for b in c)
    for result in executor.map(f, args):
    

    这将调用flen(args)次,其中f应接受一个参数。

    如果你想让f接受两个参数,你可以使用一个lambda调用,比如。

    args = ((a, b) for b in c)
    for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
        
    我所追求的是lambda部分。你能详细说明一下这些可能性吗?
    I know this is old, but when I do this I get the following error: my_method() argument after * must be a sequence, not long
    Should the first line be args = ((a, b) for b in c)
    @vz0 为什么 "在Linux上 "有下划线? 这个行为是操作系统特有的吗?
    vz0
    @pcko1 这是个很好的问题,编辑历史显示其他人做了这个编辑。为什么?我不知道!
    Vlad Bezden
    Vlad Bezden
    发布于 2022-01-29
    0 人赞同

    你可以使用咖喱法来创建新的函数,通过 部分 method in Python

    from concurrent.futures import ThreadPoolExecutor
    from functools import partial
    def some_func(param1, param2):
        # some code
    # currying some_func with 'a' argument is repeated
    func = partial(some_func, a)
    with ThreadPoolExecutor() as executor:
        executor.map(func, list_of_args):
    

    如果你需要传递多个相同的参数,你可以将它们传递给部分 method

    func = partial(some_func, a, b, c)
        
    这个解决方案对我来说并不奏效。编译器如何知道list_of_args中的列表是指some_func的一个特定属性?例如,我有一个有5个参数的函数:conduct_analysis(a, b, c, d, e),我做:partial_analysis = partial(conduct_analysis, a=a, c=c, d=d, e=e)。然后我调用:res = executor.map(partial_analysis, list_of_b)。编译器不知道 "list_of_b "的内容应该填补partial的缺失参数 "b"。当我用带有五个args的函数运行代码时,我得到了各种异常,如类型X没有attr Y,或位置参数等。
    @Dawid 把你没有遮盖的那个放在前面,所以 conduct_analysis(b, a, c, d, e)
    Baqir Khan
    Baqir Khan
    发布于 2022-01-29
    0 人赞同

    因此,假设你有一个函数,它接收 3个论点 and all the 3个论点 are 有活力 并在每次通话中不断变化。比如说。

    def multiply(a,b,c):
        print(a * b * c)
    

    为了使用线程多次调用这个,我首先会创建一个list of tuples其中每个元组是a,b,c的一个版本。

    arguments = [(1,2,3), (4,5,6), (7,8,9), ....]
    

    我们知道,concurrent.futuresmap函数会接受第一个参数作为目标功能和第二个参数为参数列表为每个将被执行的函数的版本。因此,你可能会进行这样的调用。

    for _ in executor.map(multiply, arguments) # Error
    

    But this will give you error该函数期望3 arguments but got only 1。为了解决这个问题,我们创建了一个辅助函数。

    def helper(numbers):
        multiply(numbers[0], numbers[1], numbers[2])
    

    现在,我们可以用执行器调用这个函数,如下所示。

    with ThreadPoolExecutor() as executor:
         for _ in executor.map(helper, arguments):
    

    这应该给你带来预期的结果。

    Leandro Toledo
    Leandro Toledo
    发布于 2022-01-29
    0 人赞同

    下面是一个代码片段,展示了如何用ThreadPoolExecutor向一个函数发送多个参数。

    import concurrent.futures
    def hello(first_name: str, last_name: str) -> None:
        """Prints a friendly hello with first name and last name"""
        print('Hello %s %s!' % (first_name, last_name))
    def main() -> None:
        """Examples showing how to use ThreadPoolExecutor and executer.map
        sending multiple arguments to a function"""
        # Example 1: Sending multiple arguments using tuples
        # Define tuples with sequential arguments to be passed to hello()
        args_names = (
            ('Bruce', 'Wayne'),
            ('Clark', 'Kent'),
            ('Diana', 'Prince'),
            ('Barry', 'Allen'),
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Using lambda, unpacks the tuple (*f) into hello(*args)
            executor.map(lambda f: hello(*f), args_names)
        print()
        # Example 2: Sending multiple arguments using dict with named keys
        # Define dicts with arguments as key names to be passed to hello()
        kwargs_names = (
            {'first_name': 'Bruce', 'last_name': 'Wayne'},
            {'first_name': 'Clark', 'last_name': 'Kent'},
            {'first_name': 'Diana', 'last_name': 'Prince'},
            {'first_name': 'Barry', 'last_name': 'Allen'},
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Using lambda, unpacks the dict (**f) into hello(**kwargs)
            executor.map(lambda f: hello(**f), kwargs_names)
    if __name__ == '__main__':
        main()
        
    Vaibhav K
    Vaibhav K
    发布于 2022-01-29
    0 人赞同

    假设你有如下数据框中的数据,你想把前两列传给一个函数,该函数将读取图像并预测胎儿,然后计算差异并返回差异值。

    注意:你可以根据你的要求有任何场景,并且可以分别定义功能。

    下面的代码片段将把这两列作为参数并传递给Threadpool机制(同时显示进度条)。

    ''' function that will give the difference of two numpy feature matrix'''
    def getDifference(image_1_loc, image_2_loc, esp=1e-7):
           arr1 = ''' read 1st image and extract feature '''
           arr2 = ''' read 2nd image and extract feature '''
           diff = arr1.ravel() - arr2.ravel() + esp    
           return diff
    '''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''
    with ThreadPoolExecutor() as executor:
            result = np.array(
                             list(tqdm(
                                       executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                                   total=len(df)
        
    Tengerye
    Tengerye
    发布于 2022-01-29
    0 人赞同

    For ProcessPoolExecutor.map() :

    与map(func, *iterables)类似,除了。

    迭代变量被立即收集,而不是懒惰地收集。

    func是异步执行的,对func的几个调用可以是 同时进行。

    因此, ProcessPoolExecutor.map() 的用法与Python内置的 map() 的用法是一样的。这里是文档。

    返回一个迭代器,该迭代器将函数应用于迭代器的每个项目。 产生的结果。 如果传递了额外的可迭代参数。 函数必须接受同样多的参数,并被应用于所有迭代器中的项目 的所有迭代变量。

    结论:把几个参数传给 map()

    试着在python 3下运行下面的片段,你就会很清楚了。

    from concurrent.futures import ProcessPoolExecutor
    def f(a, b):
        print(a+b)
    with ProcessPoolExecutor() as pool:
        pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))
    # 0, 2, 4
    array = [(i, i) for i in range(3)]
    with ProcessPoolExecutor() as pool:
        pool.map(f, *zip(*array))
    # 0, 2, 4
        
    H L
    H L
    发布于 2022-01-29
    0 人赞同

    我在这里看到了很多答案,但没有一个是像使用lambda表达式那样直接的。

    foo(x,y)。

    想用相同的值,即xVal和yVal调用上述方法10次? 使用concurrent.futures.ThreadPoolExecutor()作为执行器。

    for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
        
    谢谢你,这是很直接的方法,能够在一个线程中运行我的代码。
    shanu khera
    shanu khera
    发布于 2022-01-29
    0 人赞同

    下面是我一直在使用的一个简单工具。

    ########### Start of Utility Code ###########
    import os
    import sys
    import traceback
    from concurrent import futures
    from functools import partial
    def catch(fn):
        def wrap(*args, **kwargs):
            result = None
                result = fn(*args, **kwargs)
            except Exception as err:
                type_, value_, traceback_ = sys.exc_info()
                return None, (
                    args,
                    "".join(traceback.format_exception(type_, value_, traceback_)),
            else:
                return result, (args, None)
        return wrap
    def top_level_wrap(fn, arg_tuple):
        args, kwargs = arg_tuple
        return fn(*args, *kwargs)
    def create_processes(fn, values, handle_error, handle_success):
        cores = os.cpu_count()
        max_workers = 2 * cores + 1
        to_exec = partial(top_level_wrap, fn)
        with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
            for result, error in executor.map(to_exec, values):
                args, tb = error
                if tb is not None:
                    handle_error(args, tb)
                else:
                    handle_success(result)
    ########### End of Utility Code ###########
    

    使用实例 -

    ######### Start of example usage ###########
    import time
    @catch
    def fail_when_5(val):
        time.sleep(val)
        if val == 5:
            raise Exception("Error - val was 5")
        else:
            return f"No error val is {val}"
    def handle_error(args, tb):
        print("args is", args)
        print("TB is", tb)
    def top_level(val, val_2, test=None, test2="ok"):
        print(val_2, test, test2)
        return fail_when_5(val)
    handle_success = print
    if __name__ == "__main__":
        # SHAPE -> ( (args, kwargs), (args, kwargs), ... )
        values = tuple(
            ((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)