query.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.execptions import HostsRetrieveError
  4. from common.log import logger
  5. from crawler.engines import BingSearchEngine, QccSearchEngine
  6. from crawler.services.basics import BasicSearch
  7. from crawler.utils import (
  8. extract_base_url,
  9. extract_domain,
  10. is_url,
  11. err_details,
  12. )
  13. class QueryKeyWord(BasicSearch):
  14. def __init__(
  15. self,
  16. engine=None,
  17. query_workers=1,
  18. max_query_page=1,
  19. loop_query_interval=60,
  20. **kwargs
  21. ):
  22. super(QueryKeyWord, self).__init__(**kwargs)
  23. self.engine = (engine or BingSearchEngine())
  24. self._name = engine.__class__.__name__
  25. self._workers = query_workers
  26. self._max_pages = max_query_page
  27. self._interval = loop_query_interval
  28. def query_keyword(self):
  29. t_name = threading.currentThread().getName()
  30. logger.info(f'开启线程 - <{t_name}>')
  31. while True:
  32. tasks = self.scheduler.get_query_task(self.keyword_groups)
  33. if len(tasks) == 0:
  34. self.loops_interval(self._interval)
  35. continue
  36. task_key, task = tasks
  37. logger.info(f"<{t_name}> - {self._name} - {task['search']}")
  38. cur_page = 0
  39. while cur_page < self._max_pages:
  40. cur_page += 1
  41. '''检索页面元素生成数据挖掘任务'''
  42. lst = []
  43. urls = self.engine.search(task['search'], cur_page)
  44. for url in urls:
  45. base_url = extract_base_url(url)
  46. if self.validator.data(base_url):
  47. continue
  48. lst.append(self.make_task(
  49. url=base_url,
  50. origin=task['origin'],
  51. groups=task['groups'],
  52. classify=self.visit_classify,
  53. weight=task['weight'],
  54. ))
  55. '''推送数据挖掘队列'''
  56. self.scheduler.add_excavate(lst, level=task['weight'])
  57. logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
  58. # '''查询记录'''
  59. # self.push_records(task)
  60. def start(self):
  61. with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
  62. futures = []
  63. for _ in range(1, self._workers + 1):
  64. f = executor.submit(self.query_keyword)
  65. f.add_done_callback(err_details)
  66. futures.append(f)
  67. wait(futures)
  68. class QueryOrganization(BasicSearch):
  69. def __init__(
  70. self,
  71. engine=None,
  72. query_workers=1,
  73. loop_query_interval=60,
  74. **kwargs
  75. ):
  76. super(QueryOrganization, self).__init__(**kwargs)
  77. self.engine = (engine or QccSearchEngine())
  78. self._name = engine.__class__.__name__
  79. self._workers = query_workers
  80. self._interval = loop_query_interval
  81. def query_org(self):
  82. t_name = threading.currentThread().getName()
  83. logger.info(f'开启线程 - <{t_name}>')
  84. while True:
  85. tasks = self.scheduler.get_query_task(self.org_groups)
  86. if len(tasks) == 0:
  87. self.loops_interval(self._interval)
  88. continue
  89. task_key, task = tasks
  90. word = task['search']
  91. logger.info(f"<{t_name}> - {self._name} - {word}")
  92. try:
  93. url = self.engine.search(word)
  94. task['url'] = url
  95. task['name'] = word
  96. task['domain'] = extract_domain(url)
  97. '''保存数据'''
  98. self.push_query(task)
  99. if not is_url(url):
  100. continue
  101. '''此处通过收录器判断是否是已收录网站,再决定是否推送数据挖掘队列'''
  102. if self.collector.data(task['domain']):
  103. continue
  104. '''设置任务为数据挖掘类型'''
  105. task['classify'] = self.visit_classify
  106. '''推送数据挖掘队列'''
  107. self.scheduler.add_excavate(task, level=task['weight'])
  108. except HostsRetrieveError as e:
  109. task['status_code'] = e.code
  110. task['err_reason'] = e.reason
  111. logger.exception(e)
  112. '''重新放回查询队列'''
  113. self.scheduler.add_query(self.org_groups, task, level=task['weight'])
  114. # '''查询记录'''
  115. # self.push_records(task)
  116. def start(self):
  117. with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
  118. futures = []
  119. for _ in range(1, self._workers + 1):
  120. f = executor.submit(self.query_org)
  121. f.add_done_callback(err_details)
  122. futures.append(f)
  123. wait(futures)