self.loop, _ = self.start_loop(loop)
self.semaphore = asyncio.Semaphore(maxsize, loop=self.loop)
def
task_add
(
self, item=
1
):
:param item:
:return:
self.task.put(item)
def
task_done
(
self, fn
):
:param fn:
:return:
if
fn:
self.task.get()
self.task.task_done()
def
wait
(
self
):
等待任务执行完毕
:return:
self.task.join()
@property
def
running
(
self
):
获取当前线程数
:return:
return
self.task.qsize()
@staticmethod
def
_start_thread_loop
(
loop
):
运行事件循环
:param loop: loop以参数的形式传递进来运行
:return:
asyncio.set_event_loop(loop)
loop.run_forever()
async
def
_stop_thread_loop
(
self, loop_time=
1
):
:return:
while
True
:
if
self.task.empty():
self.loop.stop()
break
await
asyncio.sleep(loop_time)
def
start_loop
(
self, loop
):
运行事件循环
开启新线程
:param loop: 协程
:return:
if
not
loop:
loop = asyncio.new_event_loop()
loop_thread = Thread(target=self._start_thread_loop, args=(loop,))
loop_thread.setDaemon(
True
)
loop_thread.start()
return
loop, loop_thread
def
stop_loop
(
self, loop_time=
1
):
队列为空,则关闭线程
:param loop_time:
:return:
asyncio.run_coroutine_threadsafe(self._stop_thread_loop(loop_time), self.loop)
def
release
(
self, loop_time=
1
):
:param loop_time:
:return:
self.stop_loop(loop_time)
async
def
async_semaphore_func
(
self, func
):
:param func:
:return:
async
with
self.semaphore:
return
await
func
def
submit
(
self, func, callback=
None
):
提交任务到事件循环
:param func: 异步函数对象
:param callback: 回调函数
:return:
self.task_add()
future = asyncio.run_coroutine_threadsafe(self.async_semaphore_func(func), self.loop)
future.add_done_callback(callback)
future.add_done_callback(self.task_done)
async def thread_example(i):
url = "
http://127.0.0.1:8080/app04/async4?num={}".format(i)
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
# print(res.status)
# print(res.content)
return await res.text()
def my_callback(future):
result = future.result()
print('返回值: ', result)
def main():
# 任务组, 最大协程数
pool = AsyncPool(maxsize=100000)
# 插入任务任务
for i in range(100000):
pool.submit(thread_example(i), my_callback)
print("等待子线程结束1...")
# 停止事件循环
pool.release()
# 获取线程数
# print(pool.running)
print("等待子线程结束2...")
pool.wait()
print("等待子线程结束3...")
if name == 'main':
start_time = time.time()
main()
end_time = time.time()
print("run time: ", end_time - start_time)
'''