添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

在 Python 中,我怎样才能等到多个队列中的所有项目都被处理完呢?

7 人关注

在下面的代码中,我有两个队列用于运行不同种类的线程。这些线程以递归方式添加到对方的队列中(队列1抓取一些信息,队列2处理这些信息并向队列1添加更多信息)。

我想等待,直到两个队列中的所有项目都被完全处理。目前我正在使用这段代码

queue.join()
out_queue.join()

问题是,当第一个队列暂时没有东西可做时,它就关闭了,所以它永远看不到队列2(out_queue)在那之后添加的东西。

我加入了time.sleep()函数,这是一个非常黑的修复方法,到30秒时,两个队列都已经填满了,不会跑出来。

解决这个问题的标准 Python 方法是什么?我是否必须只有一个队列,并在其中标记项目,以确定它们应该由哪个线程来处理?

queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue
    def run(self):
        while True:
            row = self.queue.get()
            request = urllib2.Request(row[0], None, req_headers)
            # ... some processing ...
            self.out_queue.put([row, http_status, page])
            self.queue.task_done()
class DatamineThread(threading.Thread):
    def __init__(self, out_queue, mysql):
        threading.Thread.__init__(self)
        self.out_queue = out_queue
        self.mysql = mysql
    def run(self):
        while True:
            row = self.out_queue.get()
            # ... some processing ...
            queue.put(newrow)
            self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()
#populate queue with data
for row in rows:
    queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
    
python
multithreading
queue
Josh.F
Josh.F
发布于 2014-09-17
3 个回答
user202729
user202729
发布于 2019-11-03
已采纳
0 人赞同

假设两个队列被命名为 queue_1 queue_2

  • 正确的解决方案:保持跟踪 共计 分别为待处理作品的数量 ( 带锁 ), 然后等待,直到该值为零(使用一个条件变量)。

  • 正确的解决方案,但不推荐。使用一个没有记录的API/内部方法...

    while True:
        with queue_1.mutex, queue_2.mutex:
            if queue_1.unfinished_tasks==0 and queue_2.unfinished_tasks==0:
                break
        queue_1.join()
        queue_2.join()
    
  • Incorrect solution:

    while not (queue_1.empty() and queue_2.empty()):
        queue_1.join()
        queue_2.join()
    

    这是不正确的,因为在queue_2.join和下一个while检查之后;而且有可能两个队列中都没有项目,但任务还没有完成(有一个元素正在处理)。

    例如,在下面的代码中。

    #!/bin/python
    from threading import Thread
    from queue import Queue
    import time
    queue_1 = Queue()
    queue_2 = Queue()
    def debug(): print(queue_1.qsize(), queue_2.qsize())
    def run_debug():
        while True:
            time.sleep(0.2)
            debug()
    Thread(target=run_debug).start()
    def run_1():
        while True:
            value=queue_1.get()
            print("get value", value)
            time.sleep(1)
            if value:
                print("put value", value-1)
                queue_2.put(value-1)
            time.sleep(0.5)
            queue_1.task_done()
    def run_2():
        while True:
            value=queue_2.get()
            print("get value", value)
            time.sleep(1)
            if value:
                print("put value", value-1)
                queue_1.put(value-1)
            time.sleep(0.5)
            queue_2.task_done()
    thread_1 = Thread(target=run_1)
    thread_2 = Thread(target=run_2)
    thread_1.start()
    thread_2.start()
    queue_1.put(3)
    # wait for both queues
    while not (queue_1.empty() and queue_2.empty()):
        queue_1.join()
        queue_2.join()
    print("done")
    # (add code to stop the threads properly)
    

    the output is

    get value 3
    get value 2
    get value 1
    get value 0
        
  • johntellsall
    johntellsall
    发布于 2019-11-03
    0 人赞同

    改变工人,使他们需要一个 sentinel 值来退出,而不是在队列中没有任何工作时退出。 在下面的代码中, howdy 工作者从输入队列中读取项目。 如果值是哨兵( None ,但它可能是任何东西),该工作者就退出。

    因此,你不需要搞超时,正如你已经发现的那样,超时可能是相当可疑的。 另一个结果是,如果你有 N 线程,你必须附加上 N 哨兵到输入队列中,以杀死你的工人。否则,你会得到一个永远等待的工人。一个僵尸工人,如果你愿意的话。

    source

    import threading, Queue
    def howdy(q):
        for msg in iter(q.get, None):
            print 'howdy,',msg
    inq = Queue.Queue()
    for word in 'whiskey syrup bitters'.split():
        inq.put(word)
    inq.put( None )        # tell worker to exit
    thread = threading.Thread(target=howdy, args=[inq])
    thread.start()
    thread.join()
    

    output

    howdy, whiskey
    howdy, syrup
    howdy, bitters
        
    我如何用2个线程来实现这一点,每个线程都向对方的队列供货?
    替代方案:不追加N个哨兵,而是让每个退出的线程追加一个哨兵。
    虽然我认为你误解了问题,但你的解决方案只有在已知所有工作都完成时才有效(两个队列都没有剩余的项目被处理),而OP不知道如何去等待。
    rockettc
    rockettc
    发布于 2019-11-03
    0 人赞同

    我最近想做这样的事情,结果想出了这个办法。我检查每个队列的大小,一直到它们都是空的。

    inqueue = True
    while inqueue:  
      time.sleep(5)
      q1 = queue.qsize()
      q2 = out_queue.qsize()
      print("queue:%d,out_queue:%d"% (q1,q2))