excavate.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.log import logger
  4. from crawler.Task import Task
  5. from crawler.services.basics import BasicSearch
  6. from crawler.utils import (
  7. extract_base_url,
  8. extract_page_title,
  9. extract_domain,
  10. parser_domain,
  11. err_details,
  12. )
  13. TLDS = ['com', 'cn']
  14. class DataExcavate(BasicSearch):
  15. def __init__(self, workers=1, loop_interval=60, **kwargs):
  16. super(DataExcavate, self).__init__(**kwargs)
  17. self._interval = loop_interval
  18. self._workers = workers
  19. def is_rubbish(self, url: str):
  20. if self.validator.data(url):
  21. return True
  22. domain = extract_domain(url)
  23. if self.validator.data(domain):
  24. return True
  25. if domain.startswith('www.'):
  26. domain = domain.replace('www.', '')
  27. domain_lst = parser_domain(domain)
  28. domain_lst = [d for d in domain_lst if d not in TLDS]
  29. for val in domain_lst:
  30. if self.validator.data(val):
  31. return True
  32. return False
  33. def retrieve_site(self, task: Task):
  34. t_name = threading.currentThread().getName()
  35. logger.info(f'[{t_name}]开始请求 - {task["url"]}')
  36. response = self.downloader.get(task['url'])
  37. task['status_code'] = response.status_code
  38. if response.status_code != 200 or response.text in ['', None]:
  39. task['err_reason'] = response.reason
  40. logger.error(f'[{t_name}]异常网址 - {task["url"]} - {response.reason}')
  41. return False
  42. task['domain'] = extract_domain(task['url'])
  43. page_source = response.text
  44. task['name'] = extract_page_title(page_source)
  45. task['base_url'] = extract_base_url(task['url'])
  46. items = self.parser.site_items(page_source, task['base_url'])
  47. lst = []
  48. _c = 0 # 过滤词计数器
  49. for item in items:
  50. name, url = item['name'], item['host']
  51. if self.validator.words(name):
  52. lst.append(self.make_task(
  53. url=url,
  54. name=name,
  55. origin=task['origin'],
  56. groups=task['groups'],
  57. classify=self.visit_classify,
  58. weight=task['weight']
  59. ))
  60. _c += 1
  61. if _c > 1:
  62. self.push_domain(task)
  63. else:
  64. self.push_remove(task)
  65. self.scheduler.add_excavate(lst, level=task['weight'])
  66. return True
  67. def excavate(self):
  68. t_name = threading.currentThread().getName()
  69. logger.info(f'[{t_name}]数据挖掘 - 启动')
  70. while True:
  71. tasks = self.scheduler.get_excavate_task()
  72. if len(tasks) == 0:
  73. self.loops_interval(self._interval)
  74. continue
  75. task_key, task = tasks
  76. if self.is_rubbish(task['url']):
  77. logger.info(f'[{t_name}]过滤网址 - {task["url"]}')
  78. continue
  79. '''挖掘站点'''
  80. success = self.retrieve_site(task)
  81. if not success:
  82. '''url - 添加过滤器'''
  83. self.validator.add_data(task['url'])
  84. # '''挖掘记录'''
  85. # self.push_records(task)
  86. def start(self):
  87. logger.info(f'[数据挖掘]初始化加载')
  88. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  89. futures = []
  90. for _ in range(1, self._workers + 1):
  91. f = executor.submit(self.excavate)
  92. f.add_done_callback(err_details)
  93. futures.append(f)
  94. wait(futures)