basics.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import threading
  2. import time
  3. from typing import List, Mapping
  4. from common.databases import insert_one
  5. from common.log import logger
  6. from common.tools import delay_by
  7. from constants import (
  8. ORGANIZATION,
  9. KEYWORD,
  10. SEED_URL,
  11. COMPETING_GOODS,
  12. VISIT_CLASSIFY,
  13. QUERY_CLASSIFY
  14. )
  15. from crawler.Task import Task
  16. from crawler.analysis import Parser
  17. from crawler.download import Downloader
  18. from crawler.schedule import Scheduler
  19. from crawler.validate import Validator
  20. from settings import (
  21. MGO_URLS,
  22. MGO_ORGS,
  23. MGO_KEYWORDS,
  24. MGO_COMPETING_GOODS,
  25. MGO_REMOVAL_DUPLICATE,
  26. MGO_DOMAIN,
  27. MGO_QUERY,
  28. MGO_RECORDS
  29. )
  30. class BasicSearch:
  31. def __init__(
  32. self,
  33. keyword_weight=9,
  34. url_weight=8,
  35. org_weight=7,
  36. scheduler=None,
  37. validator=None,
  38. downloader=None,
  39. collector=None,
  40. parser=None,
  41. **kwargs
  42. ):
  43. self.scheduler = (scheduler or Scheduler())
  44. self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
  45. self.collector = (collector or Validator(redis_key='CollectUrl_'))
  46. self.downloader = (downloader or Downloader())
  47. self.parser = (parser or Parser())
  48. # mongo查询
  49. self.query = {'enable_added': {'$exists': False}}
  50. self.projection = {'name': 1}
  51. self.sort = [('_id', -1)]
  52. # 权重
  53. self.org_weight = org_weight
  54. self.url_weight = url_weight
  55. self.keyword_weight = keyword_weight
  56. self.retrieve_weight = 0
  57. # 分类
  58. self.visit_classify = VISIT_CLASSIFY
  59. self.query_classify = QUERY_CLASSIFY
  60. # 归属组
  61. self.org_groups = ORGANIZATION
  62. self.keyword_groups = KEYWORD
  63. self.url_groups = SEED_URL
  64. self.competing_groups = COMPETING_GOODS
  65. @staticmethod
  66. def loops_interval(interval):
  67. t_name = threading.currentThread().getName()
  68. next_run_time = delay_by((interval or 300))
  69. logger.debug(f'程序运行结束:<{t_name}>,下次运行时间:<{next_run_time}>')
  70. time.sleep(interval)
  71. @staticmethod
  72. def make_task(**kwargs):
  73. """生成Task对象"""
  74. return Task(**kwargs)
  75. @staticmethod
  76. def make_retrieve_item(task: Task):
  77. item = {
  78. 'name': task['name'],
  79. 'url': task['url'],
  80. 'domain': task['domain'],
  81. 'origin': task['origin'],
  82. 'groups': task['groups'],
  83. 'create_at': task['create_at'],
  84. 'update_at': task['update_at'],
  85. }
  86. return item
  87. @staticmethod
  88. def make_duplicate_removal(task: Task):
  89. item = {
  90. 'domain': task['domain'],
  91. 'origin': task['origin'],
  92. 'create_at': task['update_at'],
  93. }
  94. return item
  95. def _push_data(self, purpose: str, task: Task, collection):
  96. if purpose == 'save':
  97. insert_one(collection, self.make_retrieve_item(task))
  98. elif purpose == 'remove':
  99. insert_one(collection, self.make_duplicate_removal(task))
  100. else:
  101. insert_one(collection, task)
  102. def push_remove(self, task: Task):
  103. """数据去重的垃圾表"""
  104. logger.info(f"[上传去重特征]【{task['name']} - {task['url']}】")
  105. if not self.validator.data(task['url']):
  106. self._push_data('remove', task, MGO_REMOVAL_DUPLICATE)
  107. self.validator.add_data(task['url'])
  108. def push_domain(self, task: Task):
  109. """挖掘网站的查询结果"""
  110. logger.info(f"[数据挖掘 - 推送]【{task['name']} - {task['domain']}】")
  111. if not self.collector.data(task['domain']):
  112. self._push_data('save', task, MGO_DOMAIN)
  113. self.collector.add_data(task['domain'])
  114. def push_query(self, task: Task):
  115. """搜索组织单位查询结果"""
  116. logger.info(f"[查询结果 - 推送]【{task['name']} - {task['url']}】")
  117. self._push_data('save', task, MGO_QUERY)
  118. def push_records(self, task: Task):
  119. """挖掘数据的记录"""
  120. if task['name'] > 20:
  121. task['name'] = '{:.20s}'.format(task['name'])
  122. logger.info(f"[数据记录 - 推送]【{task['name']} - {task['url']}】")
  123. self._push_data('records', task, MGO_RECORDS)
  124. def orgs_table(self) -> List[Mapping]:
  125. """组织|单位"""
  126. search_orgs = []
  127. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  128. for item in cursor.sort(self.sort):
  129. search_orgs.append(item)
  130. return search_orgs
  131. def keywords_table(self):
  132. """关键词"""
  133. search_keywords = []
  134. cursor = MGO_KEYWORDS.find(projection=self.projection)
  135. for item in cursor.sort(self.sort):
  136. search_keywords.append(item['name'])
  137. return search_keywords
  138. def seed_urls_table(self) -> List[Mapping]:
  139. """种子urls"""
  140. search_urls = []
  141. cursor = MGO_URLS.find(self.query, projection=self.projection)
  142. for item in cursor.sort(self.sort):
  143. search_urls.append(item)
  144. return search_urls
  145. def competing_goods_table(self):
  146. """竞品urls"""
  147. competing_goods = []
  148. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  149. for item in cursor.sort(self.sort):
  150. competing_goods.append(item)
  151. return competing_goods