data_excavate.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.log import logger
  4. from crawler.Task import Task
  5. from crawler.services.basics import BasicSearch
  6. from crawler.utils import (
  7. extract_base_url,
  8. extract_page_title,
  9. extract_domain,
  10. err_details,
  11. )
  12. class DataExcavate(BasicSearch):
  13. def __init__(self, workers=1, loop_interval=60, **kwargs):
  14. super(DataExcavate, self).__init__(**kwargs)
  15. self._interval = loop_interval
  16. self._workers = workers
  17. def retrieve_site(self, task: Task):
  18. t_name = threading.currentThread().getName()
  19. logger.info(f'[{t_name}]开始请求 - {task["url"]}')
  20. response = self.downloader.get(task['url'])
  21. task['status_code'] = response.status_code
  22. if response.status_code != 200 or response.text in ['', None]:
  23. task['err_reason'] = response.reason
  24. logger.error(f'[{t_name}]异常网址 - {task["url"]} - {response.reason}')
  25. return False
  26. task['domain'] = extract_domain(task['url'])
  27. page_source = response.text
  28. task['name'] = extract_page_title(page_source)
  29. task['base_url'] = extract_base_url(task['url'])
  30. items = self.parser.site_items(page_source, task['base_url'])
  31. lst = []
  32. _c = 0 # 过滤词计数器
  33. for item in items:
  34. name, url = item['name'], item['host']
  35. if self.validator.words(name):
  36. lst.append(self.make_task(
  37. url=url,
  38. name=name,
  39. origin=task['origin'],
  40. groups=task['groups'],
  41. classify=self.visit_classify,
  42. weight=task['weight']
  43. ))
  44. _c += 1
  45. if _c > 1:
  46. self.push_domain(task)
  47. else:
  48. self.push_remove(task)
  49. self.scheduler.add_excavate(lst, level=task['weight'])
  50. return True
  51. def excavate(self):
  52. t_name = threading.currentThread().getName()
  53. logger.info(f'[{t_name}]数据挖掘 - 启动')
  54. while True:
  55. tasks = self.scheduler.get_excavate_task()
  56. if len(tasks) == 0:
  57. self.loops_interval(self._interval)
  58. continue
  59. task_key, task = tasks
  60. if self.validator.data(task['url']):
  61. continue
  62. '''挖掘站点'''
  63. success = self.retrieve_site(task)
  64. if not success:
  65. '''url - 添加过滤器'''
  66. self.validator.add_data(task['url'])
  67. # '''挖掘记录'''
  68. # self.push_records(task)
  69. def start(self):
  70. logger.info(f'[数据挖掘]初始化加载')
  71. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  72. futures = []
  73. for _ in range(1, self._workers + 1):
  74. f = executor.submit(self.excavate)
  75. f.add_done_callback(err_details)
  76. futures.append(f)
  77. wait(futures)