在下面的代码中,我有两个队列用于运行不同种类的线程。这些线程以递归方式添加到对方的队列中(队列1抓取一些信息,队列2处理这些信息并向队列1添加更多信息)。
我想等待,直到两个队列中的所有项目都被完全处理。目前我正在使用这段代码
queue.join()
out_queue.join()
问题是,当第一个队列暂时没有东西可做时,它就关闭了,所以它永远看不到队列2(out_queue)在那之后添加的东西。
我加入了time.sleep()函数,这是一个非常黑的修复方法,到30秒时,两个队列都已经填满了,不会跑出来。
解决这个问题的标准 Python 方法是什么?我是否必须只有一个队列,并在其中标记项目,以确定它们应该由哪个线程来处理?
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
row = self.queue.get()
request = urllib2.Request(row[0], None, req_headers)
# ... some processing ...
self.out_queue.put([row, http_status, page])
self.queue.task_done()
class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql
def run(self):
while True:
row = self.out_queue.get()
# ... some processing ...
queue.put(newrow)
self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for row in rows:
queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()