Browse Source

更新了爬虫并发下载部分的内容

jackfrued 7 years ago
parent
commit
4b3cae5479

+ 226 - 23
Day66-75/04.并发下载.md

@@ -36,24 +36,208 @@ Python3.2带来了`concurrent.futures` 模块,这个模块包含了线程池
 1. Python 2.2:第一次提出了生成器(最初称之为迭代器)的概念(PEP 255)。
 2. Python 2.5:引入了将对象发送回暂停了的生成器这一特性即生成器的`send()`方法(PEP 342)。
 3. Python 3.3:添加了`yield from`特性,允许从迭代器中返回任何值(注意生成器本身也是迭代器),这样我们就可以串联生成器并且重构出更好的生成器。
-4. Python 3.4:引入`asyncio.coroutine` 装饰器用来标记作为协程的函数,协程函数和`asyncio`及其事件循环一起使用,来实现异步I/O操作。
+4. Python 3.4:引入`asyncio.coroutine`装饰器用来标记作为协程的函数,协程函数和`asyncio`及其事件循环一起使用,来实现异步I/O操作。
 5. Python 3.5:引入了`async`和`await`,可以使用`async def`来定义一个协程函数,这个函数中不能包含任何形式的`yield`语句,但是可以使用`return`或`await`从协程中返回值。
 
-
+#### 示例代码
+
+1. 生成器 - 数据的生产者。
+
+   ```Python
+   
+   from time import sleep
+   
+   
+   # 倒计数生成器
+   def countdown(n):
+       while n > 0:
+           yield n
+           n -= 1
+   
+   
+   def main():
+       for num in countdown(5):
+           print(f'Countdown: {num}')
+           sleep(1)
+       print('Countdown Over!')
+   
+   
+   if __name__ == '__main__':
+       main()
+   
+   ```
+
+   生成器还可以叠加来组成生成器管道,代码如下所示。
+
+   ```Python
+   
+   # Fibonacci数生成器
+   def fib():
+       a, b = 0, 1
+       while True:
+           a, b = b, a + b
+           yield a
+   
+   
+   # 偶数生成器
+   def even(gen):
+       for val in gen:
+           if val % 2 == 0:
+               yield val
+   
+   
+   def main():
+       gen = even(fib())
+       for _ in range(10):
+           print(next(gen))
+   
+   
+   if __name__ == '__main__':
+       main()
+   
+   ```
+
+2. 协程 - 数据的消费者。
+
+   ```Python
+   
+   from time import sleep
+   
+   
+   # 生成器 - 数据生产者
+   def countdown_gen(n, consumer):
+       consumer.send(None)
+       while n > 0:
+           consumer.send(n)
+           n -= 1
+       consumer.send(None)
+   
+   
+   # 协程 - 数据消费者
+   def countdown_con():
+       while True:
+           n = yield
+           if n:
+               print(f'Countdown {n}')
+               sleep(1)
+           else:
+               print('Countdown Over!')
+   
+   
+   def main():
+       countdown_gen(5, countdown_con())
+   
+   
+   if __name__ == '__main__':
+       main()
+   
+   ```
+
+   > 说明:上面代码中countdown_gen函数中的第1行consumer.send(None)是为了激活生成器,通俗的说就是让生成器执行到有yield关键字的地方挂起,当然也可以通过next(consumer)来达到同样的效果。如果不愿意每次都用这样的代码来“预激”生成器,可以写一个包装器来完成该操作,代码如下所示。
+
+   ```Python
+   
+   from functools import wraps
+   
+   
+   def coroutine(fn):
+   
+       @wraps(fn)
+       def wrapper(*args, **kwargs):
+           gen = fn(*args, **kwargs)
+           next(gen)
+           return gen
+   
+       return wrapper
+   ```
+
+   这样就可以使用`@coroutine`装饰器对协程进行预激操作,不需要再写重复代码来激活协程。
+
+3. 异步I/O - 非阻塞式I/O操作。
+
+   ```Python
+   
+   import asyncio
+   
+   
+   @asyncio.coroutine
+   def countdown(name, n):
+       while n > 0:
+           print(f'Countdown[{name}]: {n}')
+           yield from asyncio.sleep(1)
+           n -= 1
+   
+   
+   def main():
+       loop = asyncio.get_event_loop()
+       tasks = [
+           countdown("A", 10), countdown("B", 5),
+       ]
+       loop.run_until_complete(asyncio.wait(tasks))
+       loop.close()
+   
+   
+   if __name__ == '__main__':
+       main()
+   
+   ```
+
+4.  `async`和`await`。
+
+   ```Python
+   
+   import asyncio
+   import aiohttp
+   
+   
+   async def download(url):
+       print('Fetch:', url)
+       async with aiohttp.ClientSession() as session:
+           async with session.get(url) as resp:
+               print(url, '--->', resp.status)
+               print(url, '--->', resp.cookies)
+               print('\n\n', await resp.text())
+   
+   
+   def main():
+       loop = asyncio.get_event_loop()
+       urls = [
+           'https://www.baidu.com',
+           'http://www.sohu.com/',
+           'http://www.sina.com.cn/',
+           'https://www.taobao.com/',
+           'https://www.jd.com/'
+       ]
+       tasks = [download(url) for url in urls]
+       loop.run_until_complete(asyncio.wait(tasks))
+       loop.close()
+   
+   
+   if __name__ == '__main__':
+       main()
+   
+   ```
+
+   上面的代码使用了[AIOHTTP](https://github.com/aio-libs/aiohttp)这个非常著名的第三方库,它实现了HTTP客户端和HTTP服务器的功能,对异步操作提供了非常好的支持,有兴趣可以阅读它的[官方文档](https://aiohttp.readthedocs.io/en/stable/)。
 
 ### 实例 - 多线程爬取“手机搜狐网”所有页面。
 
 ```Python
 
+import pickle
+import zlib
 from enum import Enum, unique
-from queue import Queue
+from hashlib import sha1
 from random import random
-from threading import Thread, current_thread
+from threading import Thread, current_thread, local
 from time import sleep
 from urllib.parse import urlparse
 
+import pymongo
+import redis
 import requests
 from bs4 import BeautifulSoup
+from bson import Binary
 
 
 @unique
@@ -113,7 +297,6 @@ class Spider(object):
 
     def parse(self, html_page, *, domain='m.sohu.com'):
         soup = BeautifulSoup(html_page, 'lxml')
-        url_links = []
         for a_tag in soup.body.select('a[href]'):
             parser = urlparse(a_tag.attrs['href'])
             scheme = parser.scheme or 'http'
@@ -122,34 +305,51 @@ class Spider(object):
                 path = parser.path
                 query = '?' + parser.query if parser.query else ''
                 full_url = f'{scheme}://{netloc}{path}{query}'
-                if full_url not in visited_urls:
-                    url_links.append(full_url)
-        return url_links
+                redis_client = thread_local.redis_client
+                if not redis_client.sismember('visited_urls', full_url):
+                    redis_client.rpush('m_sohu_task', full_url)
 
     def extract(self, html_page):
         pass
 
     def store(self, data_dict):
+        # redis_client = thread_local.redis_client
+        # mongo_db = thread_local.mongo_db
         pass
 
 
 class SpiderThread(Thread):
 
-    def __init__(self, name, spider, tasks_queue):
+    def __init__(self, name, spider):
         super().__init__(name=name, daemon=True)
         self.spider = spider
-        self.tasks_queue = tasks_queue
 
     def run(self):
+        redis_client = redis.Redis(host='1.2.3.4', port=6379, password='1qaz2wsx')
+        mongo_client = pymongo.MongoClient(host='1.2.3.4', port=27017)
+        thread_local.redis_client = redis_client
+        thread_local.mongo_db = mongo_client.msohu 
         while True:
-            current_url = self.tasks_queue.get()
-            visited_urls.add(current_url)
+            current_url = redis_client.lpop('m_sohu_task')
+            while not current_url:
+                current_url = redis_client.lpop('m_sohu_task')
             self.spider.status = SpiderStatus.WORKING
-            html_page = self.spider.fetch(current_url)
-            if html_page not in [None, '']:
-                url_links = self.spider.parse(html_page)
-                for url_link in url_links:
-                    self.tasks_queue.put(url_link)
+            current_url = current_url.decode('utf-8')
+            if not redis_client.sismember('visited_urls', current_url):
+                redis_client.sadd('visited_urls', current_url)
+                html_page = self.spider.fetch(current_url)
+                if html_page not in [None, '']:
+                    hasher = hasher_proto.copy()
+                    hasher.update(current_url.encode('utf-8'))
+                    doc_id = hasher.hexdigest()
+                    sohu_data_coll = mongo_client.msohu.webpages
+                    if not sohu_data_coll.find_one({'_id': doc_id}):
+                        sohu_data_coll.insert_one({
+                            '_id': doc_id,
+                            'url': current_url,
+                            'page': Binary(zlib.compress(pickle.dumps(html_page)))
+                        })
+                    self.spider.parse(html_page)
             self.spider.status = SpiderStatus.IDLE
 
 
@@ -158,19 +358,22 @@ def is_any_alive(spider_threads):
                 for spider_thread in spider_threads])
 
 
-visited_urls = set()
+thread_local = local()
+hasher_proto = sha1()
 
 
 def main():
-    task_queue = Queue()
-    task_queue.put('http://m.sohu.com/')
-    spider_threads = [SpiderThread('thread-%d' % i, Spider(), task_queue)
+    redis_client = redis.Redis(host='1.2.3.4', port=6379, password='1qaz2wsx')
+    if not redis_client.exists('m_sohu_task'):
+        redis_client.rpush('m_sohu_task', 'http://m.sohu.com/')
+
+    spider_threads = [SpiderThread('thread-%d' % i, Spider())
                       for i in range(10)]
     for spider_thread in spider_threads:
         spider_thread.start()
 
-    while not task_queue.empty() or is_any_alive(spider_threads):
-    	sleep(5)
+    while redis_client.exists('m_sohu_task') or is_any_alive(spider_threads):
+        pass
 
     print('Over!')
 

+ 22 - 0
Day66-75/code/asyncio01.py

@@ -0,0 +1,22 @@
+import asyncio
+
+
+@asyncio.coroutine
+def countdown(name, num):
+    while num > 0:
+        print(f'Countdown[{name}]: {num}')
+        yield from asyncio.sleep(1)
+        num -= 1
+
+
+def main():
+    loop = asyncio.get_event_loop()
+    tasks = [
+        countdown("A", 10), countdown("B", 5),
+    ]
+    loop.run_until_complete(asyncio.wait(tasks))
+    loop.close()
+
+
+if __name__ == '__main__':
+    main()

+ 29 - 0
Day66-75/code/asyncio02.py

@@ -0,0 +1,29 @@
+import asyncio
+import aiohttp
+
+
+async def download(url):
+    print('Fetch:', url)
+    async with aiohttp.ClientSession() as session:
+        async with session.get(url) as resp:
+            print(url, '--->', resp.status)
+            print(url, '--->', resp.cookies)
+            print('\n\n', await resp.text())
+
+
+def main():
+    loop = asyncio.get_event_loop()
+    urls = [
+        'https://www.baidu.com',
+        'http://www.sohu.com/',
+        'http://www.sina.com.cn/',
+        'https://www.taobao.com/',
+        'https://www.jd.com/'
+    ]
+    tasks = [download(url) for url in urls]
+    loop.run_until_complete(asyncio.wait(tasks))
+    loop.close()
+
+
+if __name__ == '__main__':
+    main()

+ 27 - 0
Day66-75/code/coroutine01.py

@@ -0,0 +1,27 @@
+from time import sleep
+
+
+def countdown_gen(n, consumer):
+    consumer.send(None)
+    while n > 0:
+        consumer.send(n)
+        n -= 1
+    consumer.send(None)
+
+
+def countdown_con():
+    while True:
+        n = yield
+        if n:
+            print(f'Countdown {n}')
+            sleep(1)
+        else:
+            print('Countdown Over!')
+
+
+def main():
+    countdown_gen(5, countdown_con())
+
+
+if __name__ == '__main__':
+    main()

+ 42 - 0
Day66-75/code/coroutine02.py

@@ -0,0 +1,42 @@
+from time import sleep
+
+from myutils import coroutine
+
+
+@coroutine
+def create_delivery_man(name, capacity=1):
+    buffer = []
+    while True:
+        size = 0
+        while size < capacity:
+            pkg_name = yield
+            if pkg_name:
+                size += 1
+                buffer.append(pkg_name)
+                print('%s正在接受%s' % (name, pkg_name))
+            else:
+                break
+        print('=====%s正在派送%d件包裹=====' % (name, len(buffer)))
+        sleep(3)
+        buffer.clear()
+
+
+def create_package_center(consumer, max_packages):
+    num = 0
+    while num <= max_packages:
+        print('快递中心准备派送%d号包裹' % num)
+        consumer.send('包裹-%d' % num)
+        num += 1
+        if num % 10 == 0:
+            sleep(5)
+    consumer.send(None)
+
+
+def main():
+    print(create_delivery_man.__name__)
+    dm = create_delivery_man('王大锤', 7)
+    create_package_center(dm, 25)
+
+
+if __name__ == '__main__':
+    main()

+ 21 - 0
Day66-75/code/generator01.py

@@ -0,0 +1,21 @@
+def fib():
+    a, b = 0, 1
+    while True:
+        a, b = b, a + b
+        yield a
+
+
+def even(gen):
+    for val in gen:
+        if val % 2 == 0:
+            yield val
+
+
+def main():
+    gen = even(fib())
+    for _ in range(10):
+        print(next(gen))
+
+
+if __name__ == '__main__':
+    main()

+ 18 - 0
Day66-75/code/generator02.py

@@ -0,0 +1,18 @@
+from time import sleep
+
+
+def countdown(n):
+    while n > 0:
+        yield n
+        n -= 1
+
+
+def main():
+    for num in countdown(5):
+        print(f'Countdown: {num}')
+        sleep(1)
+    print('Countdown Over!')
+
+
+if __name__ == '__main__':
+    main()

+ 12 - 0
Day66-75/code/myutils.py

@@ -0,0 +1,12 @@
+from functools import wraps
+
+
+def coroutine(fn):
+
+    @wraps(fn)
+    def wrapper(*args, **kwargs):
+        gen = fn(*args, **kwargs)
+        next(gen)
+        return gen
+
+    return wrapper