query.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.execptions import ExploreDataError
  4. from common.log import logger
  5. from crawler.services.basics import BasicService
  6. from crawler.utils import (
  7. extract_base_url,
  8. extract_domain,
  9. is_url,
  10. err_details,
  11. )
  12. class DataQuery(BasicService):
  13. """数据查询服务"""
  14. def __init__(self, engine, **kwargs):
  15. self._workers = (kwargs.pop('query_workers', None) or 1)
  16. self._interval = (kwargs.pop('query_interval', None) or 60)
  17. super(DataQuery, self).__init__(**kwargs)
  18. self._init(engine)
  19. self.kwargs = kwargs
  20. def _init(self, engine):
  21. _app_items = {
  22. self.keyword_groups: self._keywords,
  23. self.org_groups: self._organization
  24. }
  25. self._engine = engine
  26. self._name = engine.__class__.__name__
  27. self._app = _app_items[engine.usage]
  28. self._app_name = f'DataQuery_{engine.usage}'
  29. def _keywords(self):
  30. t_name = threading.currentThread().getName()
  31. logger.info(f'开启线程 - <{t_name}>')
  32. _max_pages = (self.kwargs.pop('max_pages', None) or 1)
  33. while True:
  34. tasks = self.scheduler.get_query_task(self.keyword_groups)
  35. if len(tasks) == 0:
  36. self.loops_interval(self._interval)
  37. continue
  38. task_key, task = tasks
  39. logger.info(f"<{t_name}> - {self._name} - {task['search']}")
  40. cur_page = 0
  41. while cur_page < _max_pages:
  42. cur_page += 1
  43. '''检索页面元素生成数据挖掘任务'''
  44. lst = []
  45. urls = self._engine.search(task['search'], cur_page)
  46. for url in urls:
  47. base_url = extract_base_url(url)
  48. if not self.validator.data(base_url):
  49. lst.append(self.make_task(
  50. url=base_url,
  51. origin=task['origin'],
  52. groups=task['groups'],
  53. classify=self.visit_classify,
  54. weight=task['weight'],
  55. ))
  56. '''推送数据挖掘队列'''
  57. self.scheduler.add_excavate(lst, level=task['weight'])
  58. logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
  59. # '''查询记录'''
  60. # self.push_records(task)
  61. def _organization(self):
  62. t_name = threading.currentThread().getName()
  63. logger.info(f'开启线程 - <{t_name}>')
  64. while True:
  65. tasks = self.scheduler.get_query_task(self.org_groups)
  66. if len(tasks) == 0:
  67. self.loops_interval(self._interval)
  68. continue
  69. task_key, task = tasks
  70. word = task['search']
  71. logger.info(f"<{t_name}> - {self._name} - {word}")
  72. try:
  73. url = self._engine.search(word)
  74. task['url'] = url
  75. task['name'] = word
  76. task['domain'] = extract_domain(url)
  77. '''保存数据'''
  78. self.push_query(task)
  79. if not is_url(url):
  80. continue
  81. '''此处通过收录器判断是否已收录网站,再决定是否推送数据挖掘队列'''
  82. if self.collector.data(task['domain']):
  83. continue
  84. '''设置任务为数据挖掘类型'''
  85. task['classify'] = self.visit_classify
  86. '''推送数据挖掘队列'''
  87. self.scheduler.add_excavate(task, level=task['weight'])
  88. except ExploreDataError as e:
  89. task['status_code'] = e.code
  90. task['err_reason'] = e.reason
  91. logger.exception(e)
  92. '''重新放回查询队列'''
  93. self.scheduler.add_query(self.org_groups, task, level=task['weight'])
  94. # '''查询记录'''
  95. # self.push_records(task)
  96. def start(self):
  97. with ThreadPoolExecutor(self._workers, self._app_name) as executor:
  98. futures = []
  99. for _ in range(1, self._workers + 1):
  100. f = executor.submit(self._app)
  101. f.add_done_callback(err_details)
  102. futures.append(f)
  103. wait(futures)