basics.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import threading
  2. import time
  3. from typing import List, Mapping
  4. from common.databases import insert_one, update_one_by_domain
  5. from common.log import logger
  6. from common.tools import delay_by
  7. from constants import (
  8. ORGANIZATION,
  9. KEYWORD,
  10. SEED_URL,
  11. COMPETING_GOODS,
  12. VISIT_CLASSIFY,
  13. QUERY_CLASSIFY
  14. )
  15. from crawler.Task import Task
  16. from crawler.analysis import Parser
  17. from crawler.download import Downloader
  18. from crawler.schedule import Scheduler
  19. from crawler.validate import Validator
  20. from settings import (
  21. MGO_URLS,
  22. MGO_ORGS,
  23. MGO_KEYWORDS,
  24. MGO_COMPETING_GOODS,
  25. MGO_DATA_GARBAGE,
  26. MGO_DOMAIN,
  27. MGO_QUERY,
  28. MGO_RECORDS
  29. )
  30. class BasicService:
  31. def __init__(
  32. self,
  33. scheduler=None,
  34. validator=None,
  35. downloader=None,
  36. collector=None,
  37. parser=None,
  38. **kwargs
  39. ):
  40. self.scheduler = (scheduler or Scheduler())
  41. self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
  42. self.collector = (collector or Validator(redis_key='CollectUrl_'))
  43. self.downloader = (downloader or Downloader())
  44. self.parser = (parser or Parser())
  45. # mongo查询
  46. self.query = {'enable_added': {'$exists': False}}
  47. self.projection = {'name': 1}
  48. self.sort = [('_id', -1)]
  49. # 权重
  50. self.org_weight = (kwargs.pop('org_weight', None) or 7)
  51. self.url_weight = (kwargs.pop('url_weight', None) or 8)
  52. self.keyword_weight = (kwargs.pop('keyword_weight', None) or 9)
  53. # 分类
  54. self.visit_classify = VISIT_CLASSIFY
  55. self.query_classify = QUERY_CLASSIFY
  56. # 归属组
  57. self.org_groups = ORGANIZATION
  58. self.keyword_groups = KEYWORD
  59. self.url_groups = SEED_URL
  60. self.competing_groups = COMPETING_GOODS
  61. @staticmethod
  62. def loops_interval(interval, enable_debug_log=False):
  63. t_name = threading.currentThread().getName()
  64. next_run_time = delay_by((interval or 300))
  65. if enable_debug_log:
  66. logger.debug(f'运行结束:<{t_name}>,下次运行时间:{next_run_time}')
  67. time.sleep(interval)
  68. @staticmethod
  69. def make_task(**kwargs):
  70. """生成Task对象"""
  71. return Task(**kwargs)
  72. @staticmethod
  73. def make_retrieve_item(task: Task):
  74. item = {
  75. 'name': task['name'],
  76. 'url': task['url'],
  77. 'domain': task['domain'],
  78. 'origin': task['origin'],
  79. 'groups': task['groups'],
  80. 'create_at': task['create_at'],
  81. 'update_at': task['update_at'],
  82. }
  83. return item
  84. @staticmethod
  85. def make_domain_item(task: Task):
  86. item = {
  87. 'name': task['name'],
  88. 'url': task['url'],
  89. 'domain': task['domain'],
  90. 'depth': task['depth'],
  91. 'origin': task['origin'],
  92. 'groups': task['groups'],
  93. 'create_at': task['create_at'],
  94. 'update_at': task['update_at'],
  95. }
  96. return item
  97. @staticmethod
  98. def make_duplicate_removal(task: Task):
  99. item = {
  100. 'domain': task['domain'],
  101. 'origin': task['origin'],
  102. 'create_at': task['update_at'],
  103. }
  104. return item
  105. def _push_data(self, purpose: str, task: Task, collection):
  106. t_name = threading.currentThread().getName()
  107. if purpose == 'query':
  108. item = self.make_retrieve_item(task)
  109. insert_one(collection, item)
  110. logger.info(f'<{t_name}> - 上传查询结果 - {item["_id"]}')
  111. elif purpose == 'domain':
  112. item = self.make_domain_item(task)
  113. insert_one(collection, item)
  114. logger.info(f'<{t_name}> - 上传挖掘结果 - {item["_id"]}')
  115. elif purpose == 'remove':
  116. item = self.make_duplicate_removal(task)
  117. update_one_by_domain(collection, item)
  118. logger.info(f'<{t_name}> - 上传去重特征 - {item["domain"]}')
  119. else:
  120. insert_one(collection, task)
  121. logger.info(f'<{t_name}> - 上传记录数据 - {task["_id"]}')
  122. def push_remove(self, task: Task):
  123. """数据去重表"""
  124. if not self.validator.data(task['url']):
  125. self._push_data('remove', task, MGO_DATA_GARBAGE)
  126. self.validator.add_data(task['url'])
  127. return True
  128. return False
  129. def push_domain(self, task: Task):
  130. """数据挖掘结果,推送保存"""
  131. if task['groups'] == self.url_groups:
  132. duplicate = str(task['origin']).count(task['domain']) > 0
  133. if duplicate:
  134. return False
  135. if not self.collector.data(task['domain']):
  136. self._push_data('domain', task, MGO_DOMAIN)
  137. self.collector.add_data(task['domain'])
  138. return True
  139. return False
  140. def push_query(self, task: Task):
  141. """搜索组织单位查询结果,推送保存"""
  142. self._push_data('query', task, MGO_QUERY)
  143. def push_records(self, task: Task):
  144. """挖掘数据的记录"""
  145. if len(task['name']) > 50:
  146. task['name'] = '{:.50s}'.format(task['name'])
  147. self._push_data('records', task, MGO_RECORDS)
  148. def orgs_table(self) -> List[Mapping]:
  149. """组织|单位表"""
  150. search_orgs = []
  151. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  152. for item in cursor.sort(self.sort):
  153. search_orgs.append(item)
  154. return search_orgs
  155. def keywords_table(self):
  156. """关键词表"""
  157. search_keywords = []
  158. cursor = MGO_KEYWORDS.find(projection=self.projection)
  159. for item in cursor.sort(self.sort):
  160. search_keywords.append(item['name'])
  161. return search_keywords
  162. def seed_urls_table(self) -> List[Mapping]:
  163. """种子列表"""
  164. search_urls = []
  165. cursor = MGO_URLS.find(self.query, projection=self.projection)
  166. for item in cursor.sort(self.sort):
  167. search_urls.append(item)
  168. return search_urls
  169. def competing_goods_table(self):
  170. """竞品列表"""
  171. competing_goods = []
  172. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  173. for item in cursor.sort(self.sort):
  174. competing_goods.append(item)
  175. return competing_goods