data_query.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.execptions import HostsRetrieveError
  4. from common.log import logger
  5. from crawler.engines import BingSearchEngine, QccSearchEngine
  6. from crawler.services.basics import BasicSearch
  7. from crawler.utils import (
  8. extract_base_url,
  9. extract_domain,
  10. is_url,
  11. err_details,
  12. )
  13. class QueryKeyWord(BasicSearch):
  14. def __init__(
  15. self,
  16. engine=None,
  17. query_workers=1,
  18. max_query_page=1,
  19. loop_query_interval=60,
  20. **kwargs
  21. ):
  22. super(QueryKeyWord, self).__init__(**kwargs)
  23. self.engine = (engine or BingSearchEngine())
  24. self._name = engine.__class__.__name__
  25. self._workers = query_workers
  26. self._max_pages = max_query_page
  27. self._interval = loop_query_interval
  28. def query_keyword(self):
  29. t_name = threading.currentThread().getName()
  30. logger.info(f'[查询搜索词]启动 - <{t_name} - {self._name}>')
  31. while True:
  32. tasks = self.scheduler.get_query_task(self.keyword_groups)
  33. if len(tasks) == 0:
  34. self.loops_interval(self._interval)
  35. continue
  36. task_key, task = tasks
  37. logger.info(f"<{t_name} - {self._name}>{task['groups']} >> {task['search']}")
  38. cur_page = 0
  39. while cur_page < self._max_pages:
  40. cur_page += 1
  41. '''检索页面元素生成数据挖掘任务'''
  42. lst = []
  43. urls = self.engine.search(task['search'], cur_page)
  44. for url in urls:
  45. base_url = extract_base_url(url)
  46. if self.validator.data(base_url):
  47. continue
  48. lst.append(self.make_task(
  49. url=base_url,
  50. origin=task['origin'],
  51. groups=task['groups'],
  52. classify=self.visit_classify,
  53. weight=task['weight'],
  54. ))
  55. '''推送数据挖掘队列'''
  56. self.scheduler.add_excavate(lst, level=task['weight'])
  57. logger.info(f'<{self._name}> {task["search"]}-第{cur_page}页-共{len(lst)}条')
  58. # '''查询记录'''
  59. # self.push_records(task)
  60. def start(self):
  61. logger.info(f'[查询搜索词]初始化加载')
  62. with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
  63. futures = []
  64. for _ in range(1, self._workers + 1):
  65. f = executor.submit(self.query_keyword)
  66. f.add_done_callback(err_details)
  67. futures.append(f)
  68. wait(futures)
  69. class QueryOrganization(BasicSearch):
  70. def __init__(
  71. self,
  72. engine=None,
  73. query_workers=1,
  74. loop_query_interval=60,
  75. **kwargs
  76. ):
  77. super(QueryOrganization, self).__init__(**kwargs)
  78. self.engine = (engine or QccSearchEngine())
  79. self._name = engine.__class__.__name__
  80. self._workers = query_workers
  81. self._interval = loop_query_interval
  82. def query_org(self):
  83. t_name = threading.currentThread().getName()
  84. logger.info(f'[查询单位组织]启动 - <{t_name} - {self._name}>')
  85. while True:
  86. tasks = self.scheduler.get_query_task(self.org_groups)
  87. if len(tasks) == 0:
  88. self.loops_interval(self._interval)
  89. continue
  90. task_key, task = tasks
  91. word = task['search']
  92. logger.info(f"<{t_name} - {self._name}> {task['groups']} >> {word}")
  93. try:
  94. url = self.engine.search(word)
  95. task['url'] = url
  96. task['name'] = word
  97. task['domain'] = extract_domain(url)
  98. '''保存数据'''
  99. self.push_query(task)
  100. if not is_url(url):
  101. continue
  102. '''此处通过收录器判断是否是已收录网站,再决定是否推送数据挖掘队列'''
  103. if self.collector.data(task['domain']):
  104. continue
  105. '''设置任务为数据挖掘类型'''
  106. task['classify'] = self.visit_classify
  107. '''推送数据挖掘队列'''
  108. self.scheduler.add_excavate(task, level=task['weight'])
  109. except HostsRetrieveError as e:
  110. task['status_code'] = e.code
  111. task['err_reason'] = e.reason
  112. logger.exception(e)
  113. '''重新放回查询队列'''
  114. self.scheduler.add_query(self.org_groups, task, level=task['weight'])
  115. # '''查询记录'''
  116. # self.push_records(task)
  117. def start(self):
  118. logger.info(f'[查询单位组织]初始化加载')
  119. with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
  120. futures = []
  121. for _ in range(1, self._workers + 1):
  122. f = executor.submit(self.query_org)
  123. f.add_done_callback(err_details)
  124. futures.append(f)
  125. wait(futures)