basics.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import threading
  2. import time
  3. from typing import List, Mapping
  4. from common.databases import insert_one, int2long
  5. from common.log import logger
  6. from common.tools import delay_by
  7. from crawler.Task import Task
  8. from crawler.analysis import Parser
  9. from crawler.download import Downloader
  10. from crawler.schedule import Scheduler
  11. from constants import (
  12. ORGANIZATION,
  13. KEYWORD,
  14. SEED_URL,
  15. COMPETING_GOODS,
  16. VISIT_CLASSIFY,
  17. QUERY_CLASSIFY
  18. )
  19. from crawler.utils import extract_domain, is_url
  20. from crawler.validate import Validator
  21. from settings import (
  22. MGO_LUA_SPIDERS,
  23. MGO_URLS,
  24. MGO_ORGS,
  25. MGO_KEYWORDS,
  26. MGO_COMPETING_GOODS,
  27. MGO_REMOVAL_DUPLICATE,
  28. MGO_DOMAIN,
  29. MGO_QUERY,
  30. MGO_RECORDS
  31. )
  32. class BasicSearch:
  33. def __init__(
  34. self,
  35. keyword_weight=9,
  36. url_weight=8,
  37. org_weight=7,
  38. scheduler=None,
  39. validator=None,
  40. downloader=None,
  41. collector=None,
  42. parser=None,
  43. **kwargs
  44. ):
  45. self.scheduler = (scheduler or Scheduler())
  46. self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
  47. self.collector = (collector or Validator(redis_key='CollectUrl_'))
  48. self.downloader = (downloader or Downloader())
  49. self.parser = (parser or Parser())
  50. # mongo查询
  51. self.query = {'enable_added': {'$exists': False}}
  52. self.projection = {'name': 1}
  53. self.sort = [('_id', -1)]
  54. # 权重
  55. self.org_weight = org_weight
  56. self.url_weight = url_weight
  57. self.keyword_weight = keyword_weight
  58. self.retrieve_weight = 0
  59. # 分类
  60. self.visit_classify = VISIT_CLASSIFY
  61. self.query_classify = QUERY_CLASSIFY
  62. # 归属组
  63. self.org_groups = ORGANIZATION
  64. self.keyword_groups = KEYWORD
  65. self.url_groups = SEED_URL
  66. self.competing_groups = COMPETING_GOODS
  67. @staticmethod
  68. def loops_interval(interval):
  69. t_name = threading.currentThread().getName()
  70. next_run_time = delay_by((interval or 300))
  71. logger.debug(f'程序运行结束:<{t_name}>,下次运行时间:{next_run_time}')
  72. time.sleep(interval)
  73. @staticmethod
  74. def make_task(**kwargs):
  75. """生成Task对象"""
  76. return Task(**kwargs)
  77. @staticmethod
  78. def make_retrieve_item(task: Task):
  79. item = {
  80. 'name': task['name'],
  81. 'url': task['url'],
  82. 'domain': task['domain'],
  83. 'origin': task['origin'],
  84. 'groups': task['groups'],
  85. 'create_at': task['create_at'],
  86. 'update_at': task['update_at'],
  87. }
  88. return item
  89. @staticmethod
  90. def make_duplicate_removal(task: Task):
  91. item = {
  92. 'domain': task['domain'],
  93. 'origin': task['origin'],
  94. 'create_at': task['update_at'],
  95. }
  96. return item
  97. def _push_data(self, purpose: str, task: Task, collection):
  98. if purpose == 'save':
  99. insert_one(collection, self.make_retrieve_item(task))
  100. elif purpose == 'remove':
  101. insert_one(collection, self.make_duplicate_removal(task))
  102. else:
  103. insert_one(collection, task)
  104. def push_remove(self, task: Task):
  105. """数据去重的垃圾表"""
  106. logger.info(f"[上传去重特征]【{task['name']} - {task['url']}】")
  107. if not self.validator.data(task['url']):
  108. self._push_data('remove', task, MGO_REMOVAL_DUPLICATE)
  109. self.validator.add_data(task['url'])
  110. def push_domain(self, task: Task):
  111. """挖掘网站的查询结果"""
  112. logger.info(f"[推送挖掘结果]【{task['name']} - {task['domain']}】")
  113. if not self.collector.data(task['domain']):
  114. self._push_data('save', task, MGO_DOMAIN)
  115. self.collector.add_data(task['domain'])
  116. def push_query(self, task: Task):
  117. """搜索组织单位查询结果"""
  118. logger.info(f"[推送查询结果]【{task['name']} - {task['url']}】")
  119. self._push_data('save', task, MGO_QUERY)
  120. def push_records(self, task: Task):
  121. """挖掘数据的记录"""
  122. if task['name'] > 20:
  123. task['name'] = '{:.20s}'.format(task['name'])
  124. logger.info(f"[推送数据记录]【{task['name']} - {task['url']}】")
  125. self._push_data('records', task, MGO_RECORDS)
  126. def seed_orgs(self) -> List[Mapping]:
  127. """组织|单位"""
  128. search_orgs = []
  129. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  130. for item in cursor.sort(self.sort):
  131. search_orgs.append(item)
  132. return search_orgs
  133. def seed_keywords(self):
  134. """关键词"""
  135. search_keywords = []
  136. cursor = MGO_KEYWORDS.find(projection=self.projection)
  137. for item in cursor.sort(self.sort):
  138. search_keywords.append(item['name'])
  139. return search_keywords
  140. def seed_urls(self) -> List[Mapping]:
  141. """种子urls"""
  142. search_urls = []
  143. cursor = MGO_URLS.find(self.query, projection=self.projection)
  144. for item in cursor.sort(self.sort):
  145. search_urls.append(item)
  146. return search_urls
  147. def seed_competing_goods(self):
  148. """竞品urls"""
  149. competing_goods = []
  150. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  151. for item in cursor.sort(self.sort):
  152. competing_goods.append(item)
  153. return competing_goods
  154. def lua_common_domains(self):
  155. """从lua采集爬虫配置表获取网址域名,推送到数据源收录器"""
  156. _c = 0
  157. projection = {'param_common': 1}
  158. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  159. for item in cursor.sort(self.sort):
  160. try:
  161. url = item['param_common'][11]
  162. if not is_url(url):
  163. continue
  164. domain = extract_domain(url)
  165. except IndexError:
  166. continue
  167. if not self.collector.data(domain):
  168. self.collector.add_data(domain)
  169. _c += 1
  170. return _c