query.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.execptions import HostsRetrieveError
  4. from common.log import logger
  5. from crawler.engines import BingSearchEngine, QccSearchEngine
  6. from crawler.services.basics import BasicSearch
  7. from crawler.utils import (
  8. extract_base_url,
  9. extract_domain,
  10. is_url,
  11. err_details,
  12. )
  13. class QueryKeyWord(BasicSearch):
  14. def __init__(self, engine=None, **kwargs):
  15. self._workers = (kwargs.pop('query_workers', None) or 1)
  16. self._max_pages = (kwargs.pop('max_pages', None) or 1)
  17. self._interval = (kwargs.pop('query_interval', None) or 60)
  18. super(QueryKeyWord, self).__init__(**kwargs)
  19. self.engine = (engine or BingSearchEngine())
  20. self._name = engine.__class__.__name__
  21. def query_keyword(self):
  22. t_name = threading.currentThread().getName()
  23. logger.info(f'开启线程 - <{t_name}>')
  24. while True:
  25. tasks = self.scheduler.get_query_task(self.keyword_groups)
  26. if len(tasks) == 0:
  27. self.loops_interval(self._interval)
  28. continue
  29. task_key, task = tasks
  30. logger.info(f"<{t_name}> - {self._name} - {task['search']}")
  31. cur_page = 0
  32. while cur_page < self._max_pages:
  33. cur_page += 1
  34. '''检索页面元素生成数据挖掘任务'''
  35. lst = []
  36. urls = self.engine.search(task['search'], cur_page)
  37. for url in urls:
  38. base_url = extract_base_url(url)
  39. if self.validator.data(base_url):
  40. continue
  41. lst.append(self.make_task(
  42. url=base_url,
  43. origin=task['origin'],
  44. groups=task['groups'],
  45. classify=self.visit_classify,
  46. weight=task['weight'],
  47. ))
  48. '''推送数据挖掘队列'''
  49. self.scheduler.add_excavate(lst, level=task['weight'])
  50. logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
  51. # '''查询记录'''
  52. # self.push_records(task)
  53. def start(self):
  54. with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
  55. futures = []
  56. for _ in range(1, self._workers + 1):
  57. f = executor.submit(self.query_keyword)
  58. f.add_done_callback(err_details)
  59. futures.append(f)
  60. wait(futures)
  61. class QueryOrganization(BasicSearch):
  62. def __init__(self, engine=None, **kwargs):
  63. self._workers = (kwargs.pop('query_workers', None) or 1)
  64. self._interval = (kwargs.pop('query_interval', None) or 60)
  65. super(QueryOrganization, self).__init__(**kwargs)
  66. self.engine = (engine or QccSearchEngine())
  67. self._name = engine.__class__.__name__
  68. def query_org(self):
  69. t_name = threading.currentThread().getName()
  70. logger.info(f'开启线程 - <{t_name}>')
  71. while True:
  72. tasks = self.scheduler.get_query_task(self.org_groups)
  73. if len(tasks) == 0:
  74. self.loops_interval(self._interval)
  75. continue
  76. task_key, task = tasks
  77. word = task['search']
  78. logger.info(f"<{t_name}> - {self._name} - {word}")
  79. try:
  80. url = self.engine.search(word)
  81. task['url'] = url
  82. task['name'] = word
  83. task['domain'] = extract_domain(url)
  84. '''保存数据'''
  85. self.push_query(task)
  86. if not is_url(url):
  87. continue
  88. '''此处通过收录器判断是否是已收录网站,再决定是否推送数据挖掘队列'''
  89. if self.collector.data(task['domain']):
  90. continue
  91. '''设置任务为数据挖掘类型'''
  92. task['classify'] = self.visit_classify
  93. '''推送数据挖掘队列'''
  94. self.scheduler.add_excavate(task, level=task['weight'])
  95. except HostsRetrieveError as e:
  96. task['status_code'] = e.code
  97. task['err_reason'] = e.reason
  98. logger.exception(e)
  99. '''重新放回查询队列'''
  100. self.scheduler.add_query(self.org_groups, task, level=task['weight'])
  101. # '''查询记录'''
  102. # self.push_records(task)
  103. def start(self):
  104. with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
  105. futures = []
  106. for _ in range(1, self._workers + 1):
  107. f = executor.submit(self.query_org)
  108. f.add_done_callback(err_details)
  109. futures.append(f)
  110. wait(futures)