data_query.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.execptions import HostsRetrieveError
  4. from common.log import logger
  5. from crawler.engines import JySearchEngine
  6. from crawler.services.basics import BasicSearch
  7. from crawler.utils import (
  8. extract_base_url,
  9. extract_domain,
  10. is_url,
  11. err_details,
  12. )
  13. class DataQuery(BasicSearch):
  14. def __init__(
  15. self,
  16. engines=None,
  17. max_query_page=1,
  18. loop_query_interval=60,
  19. **kwargs
  20. ):
  21. super(DataQuery, self).__init__(**kwargs)
  22. self._max_pages = max_query_page
  23. self._interval = loop_query_interval
  24. self._engines = []
  25. self.set_engines(engines)
  26. def _init(self):
  27. self.set_engines(self._engines)
  28. def _set_engine(self, engine):
  29. if isinstance(engine, JySearchEngine):
  30. self._engines.append(engine)
  31. logger.info(f'[数据查询]添加引擎 - <{engine.__class__.__name__}>')
  32. return self
  33. def set_engines(self, engines):
  34. if isinstance(engines, list):
  35. for engine in engines:
  36. self._set_engine(engine)
  37. else:
  38. self._set_engine(engines)
  39. return self
  40. def search(self, engine):
  41. ename = engine.__class__.__name__
  42. threading.currentThread().setName(ename)
  43. logger.info(f'[数据查询]启动引擎 - <{ename}>')
  44. while True:
  45. tasks = self.scheduler.get_query_task()
  46. if len(tasks) == 0:
  47. self.loops_interval(self._interval)
  48. continue
  49. task_key, task = tasks
  50. word = task['search']
  51. if task['groups'] == self.org_groups:
  52. '''使用企查查服务检索site'''
  53. logger.info(f"<QccSearch> {task['groups']} >>> {word}")
  54. try:
  55. url = engine.by_org_get_site(word)
  56. task['url'] = url
  57. task['name'] = word
  58. task['domain'] = extract_domain(url)
  59. '''保存数据'''
  60. self.push_query(task)
  61. if not is_url(url):
  62. continue
  63. if self.validator.data(task['domain']):
  64. continue
  65. '''domain - 添加过滤器'''
  66. self.validator.add_data(task['domain'])
  67. '''推送数据挖掘队列'''
  68. task['classify'] = self.visit_classify
  69. self.scheduler.add_excavate(task, level=task['weight'])
  70. except HostsRetrieveError as e:
  71. task['status_code'] = e.code
  72. task['err_reason'] = e.reason
  73. logger.exception(e)
  74. '''重新放回查询队列'''
  75. self.scheduler.add_query(task, level=task['weight'])
  76. else:
  77. '''使用搜索引擎查询关键词'''
  78. logger.info(f"<{ename}> {task['groups']} >>> {word}")
  79. cur_page = 0
  80. while cur_page < self._max_pages:
  81. cur_page += 1
  82. '''检索文本'''
  83. lst = []
  84. urls = engine.search(word, cur_page)
  85. '''生成数据挖掘任务'''
  86. for url in urls:
  87. domain = extract_domain(url)
  88. if self.validator.data(domain):
  89. continue
  90. lst.append(self.make_task(
  91. url=extract_base_url(url),
  92. origin=task['origin'],
  93. groups=task['groups'],
  94. classify=self.visit_classify,
  95. weight=task['weight'],
  96. ))
  97. '''推送数据挖掘队列'''
  98. self.scheduler.add_excavate(lst, level=task['weight'])
  99. logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
  100. '''数据记录'''
  101. self.push_records(task)
  102. def start(self):
  103. if len(self._engines) > 0:
  104. logger.info(f'[数据查询]初始化加载')
  105. max_workers = len(self._engines) # 根据搜索引擎设置最大线程池
  106. with ThreadPoolExecutor(max_workers, 'DataQuery') as executor:
  107. futures = []
  108. for engine in self._engines:
  109. f = executor.submit(self.search, engine)
  110. f.add_done_callback(err_details)
  111. futures.append(f)
  112. wait(futures)