query.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from concurrent.futures import ThreadPoolExecutor, wait
  2. from common.execptions import ExploreDataError
  3. from common.log import logger
  4. from crawler.services.basics import BasicService
  5. from crawler.utils import (
  6. extract_host,
  7. extract_domain,
  8. is_url,
  9. err_details,
  10. )
  11. class DataQuery(BasicService):
  12. """数据查询服务"""
  13. def __init__(self, engine, **kwargs):
  14. self._workers = (kwargs.pop('query_workers', None) or 1)
  15. self._interval = (kwargs.pop('query_interval', None) or 60)
  16. super(DataQuery, self).__init__(**kwargs)
  17. self._init(engine)
  18. self.kwargs = kwargs
  19. def _init(self, engine):
  20. _app_items = {
  21. self.keyword_groups: self._keywords,
  22. self.org_groups: self._organization
  23. }
  24. self._engine = engine
  25. self._name = engine.__class__.__name__
  26. self._app = _app_items[engine.usage]
  27. self._app_name = f'dataQuery_{engine.usage}'
  28. def _keywords(self):
  29. logger.info(f'开启线程 - <{self.thread_name}>')
  30. _max_pages = (self.kwargs.pop('max_pages', None) or 1)
  31. while True:
  32. tasks = self.scheduler.get_query_task(self.keyword_groups)
  33. if len(tasks) == 0:
  34. self.loops_interval(self._interval)
  35. continue
  36. task_key, task = tasks
  37. logger.info(f"<{self.thread_name}> - {self._name} - {task['search']}")
  38. cur_page = 0
  39. while cur_page < _max_pages:
  40. cur_page += 1
  41. '''检索页面元素生成数据挖掘任务'''
  42. lst = []
  43. urls = self._engine.search(task['search'], cur_page)
  44. for url in urls:
  45. host = extract_host(url)
  46. if not self.validator.data(host):
  47. lst.append(self.make_task(
  48. url=host,
  49. origin=task['origin'],
  50. groups=task['groups'],
  51. classify=self.visit_classify,
  52. weight=task['weight'],
  53. ))
  54. '''推送数据挖掘队列'''
  55. self.scheduler.add_excavate(lst, level=task['weight'])
  56. msg = "<{}> - {} - {} - 第{}页,共{}条".format(
  57. self.thread_name,
  58. self._name,
  59. task["search"],
  60. cur_page,
  61. len(lst)
  62. )
  63. logger.info(msg)
  64. # '''查询记录'''
  65. # self.push_records(task)
  66. def _organization(self):
  67. logger.info(f'开启线程 - <{self.thread_name}>')
  68. while True:
  69. tasks = self.scheduler.get_query_task(self.org_groups)
  70. if len(tasks) == 0:
  71. self.loops_interval(self._interval)
  72. continue
  73. task_key, task = tasks
  74. word = task['search']
  75. logger.info(f"<{self.thread_name}> - {self._name} - {word}")
  76. try:
  77. url = self._engine.search(word)
  78. task['url'] = url
  79. task['name'] = word
  80. task['domain'] = extract_domain(url)
  81. '''保存数据'''
  82. self.push_query(task)
  83. if not is_url(url):
  84. continue
  85. '''此处通过收录器判断是否已收录网站,再决定是否推送数据挖掘队列'''
  86. if self.collector.data(task['domain']):
  87. continue
  88. '''设置任务为数据挖掘类型'''
  89. task['classify'] = self.visit_classify
  90. '''推送数据挖掘队列'''
  91. self.scheduler.add_excavate(task, level=task['weight'])
  92. except ExploreDataError as e:
  93. task['status_code'] = e.code
  94. task['err_reason'] = e.reason
  95. logger.exception(e)
  96. '''重新放回查询队列'''
  97. self.scheduler.add_query(self.org_groups, task, level=task['weight'])
  98. # '''查询记录'''
  99. # self.push_records(task)
  100. def start(self):
  101. with ThreadPoolExecutor(self._workers, self._app_name) as executor:
  102. futures = []
  103. for _ in range(1, self._workers + 1):
  104. f = executor.submit(self._app)
  105. f.add_done_callback(err_details)
  106. futures.append(f)
  107. wait(futures)