添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
注册/登录

Python网络爬虫的同步和异步

开发 后端
同步就是让子任务串行,而异步有点影分身之术,但在任意时间点,真身只有一个,子任务并不是真正的并行,而是充分利用了碎片化的时间,让程序不要浪费在等待上。这就是异步,效率杠杆的。

一、同步与异步

  1. #同步编程(同一时间只能做一件事,做完了才能做下一件事情)  
  2. <-a_url-><-b_url-><-c_url->  
  3. #异步编程 (可以近似的理解成同一时间有多个事情在做,但有先后)  
  4. <-a_url->  
  5.   <-b_url->  
  6.     <-c_url->  
  7.       <-d_url->  
  8.         <-e_url->  
  9.           <-f_url->  
  10.             <-g_url->  
  11.               <-h_url->  
  12.                 <--i_url-->  
  13.                   <--j_url--> 
  1. import asyncio 
  2.  
  3. #函数名:做现在的任务时不等待,能继续做别的任务。 
  4.  
  5. async def donow_meantime_dontwait(url): 
  6.  
  7.     response = await requests.get(url) 
  8.  
  9. #函数名:快速高效的做任务 
  10.  
  11. async def fast_do_your_thing(): 
  12.  
  13.     await asyncio.wait([donow_meantime_dontwait(url) for url in urls]) 
  14.  
  15. #下面两行都是套路,记住就好 
  16.  
  17. loop = asyncio.get_event_loop() 
  18.  
  19. loop.run_until_complete(fast_do_your_thing()) 

tips:

await表达式中的对象必须是awaitable

requests不支持非阻塞

aiohttp是用于异步请求的库

  1. import asyncio 
  2. import requests 
  3. import time 
  4. import aiohttp 
  5. urls = ['https://book.douban.com/tag/小说','https://book.douban.com/tag/科幻'
  6.         'https://book.douban.com/tag/漫画','https://book.douban.com/tag/奇幻'
  7.         'https://book.douban.com/tag/历史','https://book.douban.com/tag/经济学'
  8. async def requests_meantime_dont_wait(url): 
  9.     print(url) 
  10.     async with aiohttp.ClientSession() as session: 
  11.         async with session.get(url) as resp: 
  12.             print(resp.status) 
  13.             print("{url} 得到响应".format(url=url)) 
  14. async def fast_requsts(urls): 
  15.     start = time.time() 
  16.     await asyncio.wait([requests_meantime_dont_wait(url) for url in urls]) 
  17.     end = time.time() 
  18.     print("Complete in {} seconds".format(end - start)) 
  19. loop = asyncio.get_event_loop() 
  20. loop.run_until_complete(fast_requsts(urls)) 

gevent简介

gevent是一个python的并发库,它为各种并发和网络相关的任务提供了整洁的API。

gevent中用到的主要模式是greenlet,它是以C扩展模块形式接入Python的轻量级协程。 greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

requests库是阻塞式的,为了将requests同步更改为异步。只有将requests库阻塞式更改为非阻塞,异步操作才能实现。

而gevent库中的猴子补丁(monkey patch),gevent能够修改标准库里面大部分的阻塞式系统调用。这样在不改变原有代码的情况下,将应用的阻塞式方法,变成协程式的(异步)。

  1. from gevent import monkey  
  2. import gevent  
  3. import requests  
  4. import time  
  5.  
  6. monkey.patch_all()  
  7. def req(url):  
  8.     print(url)  
  9.     resp = requests.get(url)  
  10.     print(resp.status_code,url)  
  11.  
  12. def synchronous_times(urls): 
  13.  
  14.     """同步请求运行时间"""  
  15.     start = time.time() 
  16.      for url in urls:  
  17.         req(url)  
  18.     end = time.time()  
  19.     print('同步执行时间 {} s'.format(end-start))  
  20.  
  21. def asynchronous_times(urls):  
  22.     """异步请求运行时间"""  
  23.     start = time.time()  
  24.     gevent.joinall([gevent.spawn(req,url) for url in urls])  
  25.     end = time.time()  
  26.     print('异步执行时间 {} s'.format(end - start))  
  27.  
  28. urls = ['https://book.douban.com/tag/小说','https://book.douban.com/tag/科幻' 
  29.         'https://book.douban.com/tag/漫画','https://book.douban.com/tag/奇幻' 
  30.         'https://book.douban.com/tag/历史','https://book.douban.com/tag/经济学' 
  31.  
  32. synchronous_times(urls)  
  33. asynchronous_times(urls) 

gevent:异步理论与实战

gevent库中使用的最核心的是Greenlet-一种用C写的轻量级python模块。在任意时间,系统只能允许一个Greenlet处于运行状态

一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

串行和异步

高并发的核心是让一个大的任务分成一批子任务,并且子任务会被被系统高效率的调度,实现同步或者异步。在两个子任务之间切换,也就是经常说到的上下文切换。

同步就是让子任务串行,而异步有点影分身之术,但在任意时间点,真身只有一个,子任务并不是真正的并行,而是充分利用了碎片化的时间,让程序不要浪费在等待上。这就是异步,效率杠杆的。

gevent中的上下文切换是通过yield实现。在这个例子中,我们会有两个子任务,互相利用对方等待的时间做自己的事情。这里我们使用gevent.sleep(0)代表程序会在这里停0秒。

  1. import gevent  
  2. def foo():  
  3.     print('Running in foo' 
  4.     gevent.sleep(0)  
  5.     print('Explicit context switch to foo again' 
  6.  
  7. def bar():  
  8.     print('Explicit context to bar' 
  9.     gevent.sleep(0)  
  10.     print('Implicit context switch back to bar' 
  11.  
  12. gevent.joinall([  
  13.     gevent.spawn(foo),  
  14.     gevent.spawn(bar) 
  15.  
  16.     ]) 

运行的顺序:

  1. Running in foo  
  2. Explicit context to bar  
  3. Explicit context switch to foo again  
  4. Implicit context switch back to bar 

同步异步的顺序问题

同步运行就是串行,123456...,但是异步的顺序是随机的任意的(根据子任务消耗的时间而定)

  1. import gevent  
  2. import random  
  3. def task(pid):  
  4.     "" 
  5.     Some non-deterministic task  
  6.     "" 
  7.     gevent.sleep(random.randint(0,2)*0.001)  
  8.     print('Task %s done' % pid)  
  9.  
  10.  
  11. #同步(结果更像串行)  
  12. def synchronous():  
  13.     for i in range(1,10):  
  14.         task(i)  
  15.  
  16.  
  17. #异步(结果更像乱步)  
  18. def asynchronous():  
  19.     threads = [gevent.spawn(task, i) for i in range(10)]  
  20.     gevent.joinall(threads)  
  21.  
  22.  
  23. print('Synchronous同步:' 
  24. synchronous()  
  25.  
  26.  
  27. print('Asynchronous异步:'
  28.  
  29. asynchronous() 

Synchronous同步:

  1. Task 1 done  
  2. Task 2 done  
  3. Task 3 done 
  4. Task 4 done  
  5. Task 5 done  
  6. Task 6 done  
  7. Task 7 done  
  8. Task 8 done  
  9. Task 9 done 

Asynchronous异步:

  1. Task 1 done  
  2. Task 5 done  
  3. Task 6 done  
  4. Task 2 done  
  5. Task 4 done  
  6. Task 7 done  
  7. Task 8 done  
  8. Task 9 done  
  9. Task 0 done  
  10. Task 3 done 

同步案例中所有的任务都是按照顺序执行,这导致主程序是阻塞式的(阻塞会暂停主程序的执行)。

gevent.spawn会对传入的任务(子任务集合)进行进行调度,gevent.joinall方法会阻塞当前程序,除非所有的greenlet都执行完毕,程序才会结束。

实现gevent到底怎么用,把异步访问得到的数据提取出来。

在有道词典搜索框输入“hello”按回车。观察数据请求情况 观察有道的url构建。

分析url规律

  1. #url构建只需要传入word即可 
  2.  
  3. url = "http://dict.youdao.com/w/eng/{}/".format(word) 

解析网页数据

  1. def fetch_word_info(word):  
  2.     url = "http://dict.youdao.com/w/eng/{}/".format(word)  
  3.  
  4.     resp = requests.get(url,headers=headers)  
  5.     doc = pq(resp.text)  
  6.     pros = ''  
  7.     for pro in doc.items('.baav .pronounce'):  
  8.         pros+=pro.text()  
  9.  
  10.     description = ''  
  11.     for li in doc.items('#phrsListTab .trans-container ul li'):  
  12.         description +=li.text()  
  13.  
  14.     return {'word':word,'音标':pros,'注释':description} 

因为requests库在任何时候只允许有一个访问结束完全结束后,才能进行下一次访问。无法通过正规途径拓展成异步,因此这里使用了monkey补丁

  1. import requests  
  2. from pyquery import PyQuery as pq  
  3. import gevent  
  4. import time  
  5. import gevent.monkey  
  6. gevent.monkey.patch_all() 
  7.  
  8. words = ['good','bad','cool' 
  9.          'hot','nice','better' 
  10.          'head','up','down' 
  11.          'right','left','east' 
  12.  
  13. def synchronous():  
  14.     start = time.time()  
  15.     print('同步开始了' 
  16.     for word in words:  
  17.         print(fetch_word_info(word))  
  18.     end = time.time()  
  19.     print("同步运行时间: %s 秒" % str(end - start)) 
  20.  
  21.  
  22. #执行同步  
  23. synchronous() 
  1. import requests  
  2. from pyquery import PyQuery as pq  
  3. import gevent  
  4. import time  
  5. import gevent.monkey  
  6. gevent.monkey.patch_all()  
  7.  
  8. words = ['good','bad','cool' 
  9.          'hot','nice','better' 
  10.          'head','up','down' 
  11.          'right','left','east' 
  12.  
  13. def asynchronous():  
  14.     start = time.time()  
  15.     print('异步开始了' 
  16.     events = [gevent.spawn(fetch_word_info,word) for word in words]  
  17.     wordinfos = gevent.joinall(events)  
  18.     for wordinfo in wordinfos:  
  19.         #获取到数据get方法  
  20.         print(wordinfo.get())  
  21.     end = time.time()  
  22.     print("异步运行时间: %s 秒"%str(end-start))  
  23.  
  24. #执行异步  
  25. asynchronous() 

我们可以对待爬网站实时异步访问,速度会大大提高。我们现在是爬取12个词语的信息,也就是说一瞬间我们对网站访问了12次,这还没啥问题,假如爬10000+个词语,使用gevent的话,那几秒钟之内就给网站一股脑的发请求,说不定网站就把爬虫封了。

将列表等分为若干个子列表,分批爬取。举例我们有一个数字列表(0-19),要均匀的等分为4份,也就是子列表有5个数。下面是我在stackoverflow查找到的列表等分方案:

  1. seqence = list(range(20)) 
  2.  
  3. size = 5 #子列表长度 
  4.  
  5. output = [seqence[i:i+sizefor i in range(0, len(seqence), size)] 
  6.  
  7. print(output
  1. chunks = lambda seq, size: [seq[i: i+sizefor i in range(0, len(seq), size)] 
  2.  
  3. print(chunks(seq, 5)) 
  1. def chunks(seq,size):  
  2.     for i in range(0,len(seq), size):  
  3.         yield seq[i:i+size 
  4. prinT(chunks(seq,5))  
  5.     for  x  in chunks(req,5):  
  6.          print(x)  

数据量不大的情况下,选哪一种方法都可以。如果特别大,建议使用方法3.

  1. import requests  
  2. from pyquery import PyQuery as pq  
  3. import gevent  
  4. import time  
  5. import gevent.monkey  
  6. gevent.monkey.patch_all()
  7.  
  8. words = ['good','bad','cool' 
  9.          'hot','nice','better' 
  10.          'head','up','down' 
  11.          'right','left','east' 
  12.  
  13. def fetch_word_info(word):   
  14.     url = "http://dict.youdao.com/w/eng/{}/".format(word)  
  15.  
  16.     resp = requests.get(url,headers=headers)  
  17.     doc = pq(resp.text) 
  18.  
  19.  
  20.     pros = ''  
  21.     for pro in doc.items('.baav .pronounce'):  
  22.         pros+=pro.text()  
  23.  
  24.     description = ''  
  25.     for li in doc.items('#phrsListTab .trans-container ul li'):  
  26.         description +=li.text()  
  27.  
  28.     return {'word':word,'音标':pros,'注释':description}  
  29.  
  30.  
  31. def asynchronous(words):  
  32.     start = time.time()  
  33.     print('异步开始了'  
  34.  
  35.     chunks = lambda seq, size: [seq[i: i + sizefor i in range(0, len(seq), size)] 
  36.  
  37.  
  38.     for subwords in chunks(words,3):  
  39.         events = [gevent.spawn(fetch_word_info, word) for word in subwords]   
  40.         wordinfos = gevent.joinall(events)  
  41.         for wordinfo in wordinfos:  
  42.             # 获取到数据get方法  
  43.             print(wordinfo.get()) 
  44.  
  45.         time.sleep(1)   
  46.         end = time.time()  
  47.     print("异步运行时间: %s 秒" % str(end - start))  
  48.  
  49. asynchronous(words)  

责任编辑:庞桂玉 Python爱好者社区
点赞
收藏