basics.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import threading
  2. import time
  3. from typing import List, Mapping
  4. from common.databases import insert_one, int2long
  5. from common.log import logger
  6. from common.tools import delay_by
  7. from crawler.Task import Task
  8. from crawler.analysis import Parser
  9. from crawler.download import Downloader
  10. from crawler.schedule import Scheduler
  11. from crawler.utils import (
  12. extract_domain,
  13. is_url,
  14. )
  15. from crawler.validate import Validator
  16. from settings import (
  17. MGO_LUA_SPIDERS,
  18. MGO_URLS,
  19. MGO_ORGS,
  20. MGO_KEYWORDS,
  21. MGO_COMPETING_GOODS,
  22. MGO_REMOVAL_DUPLICATE,
  23. MGO_DOMAIN,
  24. MGO_QUERY,
  25. MGO_RECORDS
  26. )
  27. class BasicSearch:
  28. def __init__(
  29. self,
  30. keyword_weight=9,
  31. url_weight=8,
  32. org_weight=7,
  33. scheduler=None,
  34. validator=None,
  35. downloader=None,
  36. parser=None,
  37. **kwargs
  38. ):
  39. self.scheduler = (scheduler or Scheduler())
  40. self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
  41. self.downloader = (downloader or Downloader())
  42. self.parser = (parser or Parser())
  43. # mongo查询
  44. self.query = {'enable_added': {'$exists': False}}
  45. self.projection = {'name': 1}
  46. self.sort = [('_id', -1)]
  47. # 分类
  48. self.visit_classify = 'visit'
  49. self.query_classify = 'query'
  50. # 权重
  51. self.org_weight = org_weight
  52. self.url_weight = url_weight
  53. self.keyword_weight = keyword_weight
  54. self.retrieve_weight = 0
  55. # 归属组
  56. self.org_groups = 'organization'
  57. self.keyword_groups = 'keyword'
  58. self.url_groups = 'seed_url'
  59. self.competing_groups = 'competing_goods'
  60. @staticmethod
  61. def loops_interval(interval):
  62. t_name = threading.currentThread().getName()
  63. next_run_time = delay_by((interval or 300))
  64. logger.debug(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
  65. time.sleep(interval)
  66. @staticmethod
  67. def make_task(**kwargs):
  68. """生成Task对象"""
  69. return Task(**kwargs)
  70. @staticmethod
  71. def make_retrieve_item(task: Task):
  72. item = {
  73. 'name': task['name'],
  74. 'url': task['url'],
  75. 'domain': task['domain'],
  76. 'origin': task['origin'],
  77. 'groups': task['groups'],
  78. 'create_at': task['create_at'],
  79. 'update_at': task['update_at'],
  80. }
  81. return item
  82. @staticmethod
  83. def make_duplicate_removal(task: Task):
  84. item = {
  85. 'domain': task['domain'],
  86. 'origin': task['origin'],
  87. 'create_at': task['update_at'],
  88. }
  89. return item
  90. def _push_data(self, purpose: str, task: Task, collection):
  91. if purpose == 'save':
  92. insert_one(collection, self.make_retrieve_item(task))
  93. elif purpose == 'remove':
  94. insert_one(collection, self.make_duplicate_removal(task))
  95. else:
  96. insert_one(collection, task)
  97. def push_remove(self, task: Task):
  98. """数据去重的垃圾表"""
  99. logger.info(f"[上传去重特征]【{task['name']} - {task['url']}】")
  100. self._push_data('remove', task, MGO_REMOVAL_DUPLICATE)
  101. def push_domain(self, task: Task):
  102. """挖掘网站的查询结果"""
  103. logger.info(f"[推送挖掘结果]【{task['name']} - {task['domain']}】")
  104. self._push_data('save', task, MGO_DOMAIN)
  105. def push_query(self, task: Task):
  106. """搜索组织单位查询结果"""
  107. logger.info(f"[推送查询结果]【{task['name']} - {task['url']}】")
  108. self._push_data('save', task, MGO_QUERY)
  109. def push_records(self, task: Task):
  110. """挖掘数据的记录"""
  111. logger.info(f"[推送数据记录]【{task['name']} - {task['url']}】")
  112. self._push_data('records', task, MGO_RECORDS)
  113. def seed_orgs(self) -> List[Mapping]:
  114. """组织|单位"""
  115. search_orgs = []
  116. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  117. for item in cursor.sort(self.sort):
  118. search_orgs.append(item)
  119. return search_orgs
  120. def seed_keywords(self):
  121. """关键词"""
  122. search_keywords = []
  123. cursor = MGO_KEYWORDS.find(projection=self.projection)
  124. for item in cursor.sort(self.sort):
  125. search_keywords.append(item['name'])
  126. return search_keywords
  127. def seed_urls(self) -> List[Mapping]:
  128. """种子urls"""
  129. search_urls = []
  130. cursor = MGO_URLS.find(self.query, projection=self.projection)
  131. for item in cursor.sort(self.sort):
  132. search_urls.append(item)
  133. return search_urls
  134. def seed_competing_goods(self):
  135. """竞品urls"""
  136. competing_goods = []
  137. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  138. for item in cursor.sort(self.sort):
  139. competing_goods.append(item)
  140. return competing_goods
  141. def lua_common_domains(self):
  142. """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
  143. parm_commons = []
  144. projection = {'param_common': 1}
  145. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  146. for item in cursor.sort(self.sort):
  147. # name = item['param_common'][1]
  148. try:
  149. url = item['param_common'][11]
  150. if not is_url(url):
  151. continue
  152. domain = extract_domain(url)
  153. except IndexError:
  154. continue
  155. if not self.validator.data(domain):
  156. parm_commons.append({
  157. # 'name': name,
  158. 'domain': domain,
  159. 'origin': url,
  160. 'create_at': int2long(int(time.time()))
  161. })
  162. self.validator.add_data(domain)
  163. return parm_commons