query.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.execptions import HostsRetrieveError
  4. from common.log import logger
  5. from crawler.engines import BingSearchEngine, QccSearchEngine
  6. from crawler.services.basics import BasicSearch
  7. from crawler.utils import (
  8. extract_base_url,
  9. extract_domain,
  10. is_url,
  11. err_details,
  12. )
  13. class QueryKeyWord(BasicSearch):
  14. def __init__(self, engine=None, **kwargs):
  15. self._workers = (kwargs.pop('query_workers', None) or 1)
  16. self._max_pages = (kwargs.pop('max_pages', None) or 1)
  17. self._interval = (kwargs.pop('query_interval', None) or 60)
  18. super(QueryKeyWord, self).__init__(**kwargs)
  19. self.engine = (engine or BingSearchEngine())
  20. self._name = self.engine.__class__.__name__
  21. def query_keyword(self):
  22. t_name = threading.currentThread().getName()
  23. logger.info(f'开启线程 - <{t_name}>')
  24. while True:
  25. tasks = self.scheduler.get_query_task(self.keyword_groups)
  26. if len(tasks) == 0:
  27. self.loops_interval(self._interval)
  28. continue
  29. task_key, task = tasks
  30. logger.info(f"<{t_name}> - {self._name} - {task['search']}")
  31. cur_page = 0
  32. while cur_page < self._max_pages:
  33. cur_page += 1
  34. '''检索页面元素生成数据挖掘任务'''
  35. lst = []
  36. urls = self.engine.search(task['search'], cur_page)
  37. for url in urls:
  38. base_url = extract_base_url(url)
  39. if not self.validator.data(base_url):
  40. lst.append(self.make_task(
  41. url=base_url,
  42. origin=task['origin'],
  43. groups=task['groups'],
  44. classify=self.visit_classify,
  45. weight=task['weight'],
  46. ))
  47. '''推送数据挖掘队列'''
  48. self.scheduler.add_excavate(lst, level=task['weight'])
  49. logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
  50. # '''查询记录'''
  51. # self.push_records(task)
  52. def start(self):
  53. with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
  54. futures = []
  55. for _ in range(1, self._workers + 1):
  56. f = executor.submit(self.query_keyword)
  57. f.add_done_callback(err_details)
  58. futures.append(f)
  59. wait(futures)
  60. class QueryOrganization(BasicSearch):
  61. def __init__(self, engine=None, **kwargs):
  62. self._workers = (kwargs.pop('query_workers', None) or 1)
  63. self._interval = (kwargs.pop('query_interval', None) or 60)
  64. super(QueryOrganization, self).__init__(**kwargs)
  65. self.engine = (engine or QccSearchEngine())
  66. self._name = self.engine.__class__.__name__
  67. def query_org(self):
  68. t_name = threading.currentThread().getName()
  69. logger.info(f'开启线程 - <{t_name}>')
  70. while True:
  71. tasks = self.scheduler.get_query_task(self.org_groups)
  72. if len(tasks) == 0:
  73. self.loops_interval(self._interval)
  74. continue
  75. task_key, task = tasks
  76. word = task['search']
  77. logger.info(f"<{t_name}> - {self._name} - {word}")
  78. try:
  79. url = self.engine.search(word)
  80. task['url'] = url
  81. task['name'] = word
  82. task['domain'] = extract_domain(url)
  83. '''保存数据'''
  84. self.push_query(task)
  85. if not is_url(url):
  86. continue
  87. '''此处通过收录器判断是否已收录网站,再决定是否推送数据挖掘队列'''
  88. if self.collector.data(task['domain']):
  89. continue
  90. '''设置任务为数据挖掘类型'''
  91. task['classify'] = self.visit_classify
  92. '''推送数据挖掘队列'''
  93. self.scheduler.add_excavate(task, level=task['weight'])
  94. except HostsRetrieveError as e:
  95. task['status_code'] = e.code
  96. task['err_reason'] = e.reason
  97. logger.exception(e)
  98. '''重新放回查询队列'''
  99. self.scheduler.add_query(self.org_groups, task, level=task['weight'])
  100. # '''查询记录'''
  101. # self.push_records(task)
  102. def start(self):
  103. with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
  104. futures = []
  105. for _ in range(1, self._workers + 1):
  106. f = executor.submit(self.query_org)
  107. f.add_done_callback(err_details)
  108. futures.append(f)
  109. wait(futures)