main_redis.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import pickle
  2. import zlib
  3. from enum import Enum, unique
  4. from hashlib import sha1
  5. from random import random
  6. from threading import Thread, current_thread
  7. from time import sleep
  8. from urllib.parse import urlparse
  9. import pymongo
  10. import redis
  11. import requests
  12. from bs4 import BeautifulSoup
  13. from bson import Binary
  14. @unique
  15. class SpiderStatus(Enum):
  16. IDLE = 0
  17. WORKING = 1
  18. def decode_page(page_bytes, charsets=('utf-8',)):
  19. page_html = None
  20. for charset in charsets:
  21. try:
  22. page_html = page_bytes.decode(charset)
  23. break
  24. except UnicodeDecodeError:
  25. pass
  26. return page_html
  27. class Retry(object):
  28. def __init__(self, *, retry_times=3,
  29. wait_secs=5, errors=(Exception, )):
  30. self.retry_times = retry_times
  31. self.wait_secs = wait_secs
  32. self.errors = errors
  33. def __call__(self, fn):
  34. def wrapper(*args, **kwargs):
  35. for _ in range(self.retry_times):
  36. try:
  37. return fn(*args, **kwargs)
  38. except self.errors as e:
  39. print(e)
  40. sleep((random() + 1) * self.wait_secs)
  41. return None
  42. return wrapper
  43. class Spider(object):
  44. def __init__(self):
  45. self.status = SpiderStatus.IDLE
  46. @Retry()
  47. def fetch(self, current_url, *, charsets=('utf-8', ),
  48. user_agent=None, proxies=None):
  49. thread_name = current_thread().name
  50. print(f'[{thread_name}]: {current_url}')
  51. headers = {'user-agent': user_agent} if user_agent else {}
  52. resp = requests.get(current_url,
  53. headers=headers, proxies=proxies)
  54. return decode_page(resp.content, charsets) \
  55. if resp.status_code == 200 else None
  56. def parse(self, html_page, *, domain='m.sohu.com'):
  57. soup = BeautifulSoup(html_page, 'lxml')
  58. for a_tag in soup.body.select('a[href]'):
  59. parser = urlparse(a_tag.attrs['href'])
  60. scheme = parser.scheme or 'http'
  61. netloc = parser.netloc or domain
  62. if scheme != 'javascript' and netloc == domain:
  63. path = parser.path
  64. query = '?' + parser.query if parser.query else ''
  65. full_url = f'{scheme}://{netloc}{path}{query}'
  66. if not redis_client.sismember('visited_urls', full_url):
  67. redis_client.rpush('m_sohu_task', full_url)
  68. def extract(self, html_page):
  69. pass
  70. def store(self, data_dict):
  71. pass
  72. class SpiderThread(Thread):
  73. def __init__(self, name, spider):
  74. super().__init__(name=name, daemon=True)
  75. self.spider = spider
  76. def run(self):
  77. while True:
  78. current_url = redis_client.lpop('m_sohu_task')
  79. while not current_url:
  80. current_url = redis_client.lpop('m_sohu_task')
  81. self.spider.status = SpiderStatus.WORKING
  82. current_url = current_url.decode('utf-8')
  83. if not redis_client.sismember('visited_urls', current_url):
  84. redis_client.sadd('visited_urls', current_url)
  85. html_page = self.spider.fetch(current_url)
  86. if html_page not in [None, '']:
  87. hasher = hasher_proto.copy()
  88. hasher.update(current_url.encode('utf-8'))
  89. doc_id = hasher.hexdigest()
  90. if not sohu_data_coll.find_one({'_id': doc_id}):
  91. sohu_data_coll.insert_one({
  92. '_id': doc_id,
  93. 'url': current_url,
  94. 'page': Binary(zlib.compress(pickle.dumps(html_page)))
  95. })
  96. self.spider.parse(html_page)
  97. self.spider.status = SpiderStatus.IDLE
  98. def is_any_alive(spider_threads):
  99. return any([spider_thread.spider.status == SpiderStatus.WORKING
  100. for spider_thread in spider_threads])
  101. redis_client = redis.Redis(host='1.2.3.4',
  102. port=6379, password='1qaz2wsx')
  103. mongo_client = pymongo.MongoClient(host='120.77.222.217', port=27017)
  104. db = mongo_client.msohu
  105. sohu_data_coll = db.webpages
  106. hasher_proto = sha1()
  107. def main():
  108. if not redis_client.exists('m_sohu_task'):
  109. redis_client.rpush('m_sohu_task', 'http://m.sohu.com/')
  110. spider_threads = [SpiderThread('thread-%d' % i, Spider())
  111. for i in range(10)]
  112. for spider_thread in spider_threads:
  113. spider_thread.start()
  114. while redis_client.exists('m_sohu_task') or is_any_alive(spider_threads):
  115. pass
  116. print('Over!')
  117. if __name__ == '__main__':
  118. main()