basics.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import threading
  2. import time
  3. from typing import List, Mapping
  4. from common.databases import insert_one, update_one_by_domain
  5. from common.log import logger
  6. from constants import (
  7. ORGANIZATION,
  8. KEYWORD,
  9. SEED_URL,
  10. COMPETING_GOODS,
  11. VISIT_CLASSIFY,
  12. QUERY_CLASSIFY
  13. )
  14. from crawler.Task import Task
  15. from crawler.analysis import Parser
  16. from crawler.download import Downloader
  17. from crawler.schedule import Scheduler
  18. from crawler.validate import Validator
  19. from settings import (
  20. MGO_URLS,
  21. MGO_ORGS,
  22. MGO_KEYWORDS,
  23. MGO_COMPETING_GOODS,
  24. MGO_GARBAGE,
  25. MGO_DOMAIN,
  26. MGO_QUERY,
  27. MGO_RECORDS
  28. )
  29. class BasicService:
  30. def __init__(
  31. self,
  32. scheduler=None,
  33. validator=None,
  34. downloader=None,
  35. collector=None,
  36. parser=None,
  37. **kwargs
  38. ):
  39. self.scheduler = (scheduler or Scheduler())
  40. self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
  41. self.collector = (collector or Validator(redis_key='CollectUrl_'))
  42. self.downloader = (downloader or Downloader())
  43. self.parser = (parser or Parser())
  44. # mongo查询
  45. self.query = {'enable_added': {'$exists': False}}
  46. self.projection = {'name': 1}
  47. self.sort = [('_id', -1)]
  48. # 权重
  49. self.org_weight = (kwargs.pop('org_weight', None) or 7)
  50. self.url_weight = (kwargs.pop('url_weight', None) or 8)
  51. self.keyword_weight = (kwargs.pop('keyword_weight', None) or 9)
  52. # 分类
  53. self.visit_classify = VISIT_CLASSIFY
  54. self.query_classify = QUERY_CLASSIFY
  55. # 归属组
  56. self.org_groups = ORGANIZATION
  57. self.keyword_groups = KEYWORD
  58. self.url_groups = SEED_URL
  59. self.competing_groups = COMPETING_GOODS
  60. @property
  61. def thread_name(self):
  62. return threading.currentThread().getName()
  63. def loops_interval(self, interval, enable_debug_log=False):
  64. if enable_debug_log:
  65. logger.debug(f'Thread-<{self.thread_name}> is closed.')
  66. time.sleep(interval)
  67. @staticmethod
  68. def make_task(**kwargs):
  69. """生成Task对象"""
  70. return Task(**kwargs)
  71. def _push_data(self, purpose: str, task: Task, collection):
  72. if purpose == 'query':
  73. item = task.retrieve_task_to_dict
  74. insert_one(collection, item)
  75. logger.info(f'<{self.thread_name}> - 查询结果 - {item["_id"]}')
  76. elif purpose == 'domain':
  77. item = task.excavate_task_to_dict
  78. insert_one(collection, item)
  79. logger.info(f'<{self.thread_name}> - 挖掘结果 - {task["domain"]}')
  80. elif purpose == 'remove':
  81. item = task.validate_task_to_dict
  82. item['source'] = 'system'
  83. update_one_by_domain(collection, item)
  84. logger.info(f'<{self.thread_name}> - 添加过滤 - {task["url"]}')
  85. else:
  86. insert_one(collection, task)
  87. logger.info(f'<{self.thread_name}> - 记录数据 - {task["_id"]}')
  88. def push_remove(self, task: Task):
  89. """数据去重表"""
  90. if not self.validator.data(task['url']):
  91. self._push_data('remove', task, MGO_GARBAGE)
  92. self.validator.add_data(task['url'])
  93. return True
  94. return False
  95. def push_domain(self, task: Task):
  96. """数据挖掘结果,推送保存"""
  97. if not self.collector.data(task['domain']):
  98. self._push_data('domain', task, MGO_DOMAIN)
  99. self.collector.add_data(task['domain'])
  100. return True
  101. return False
  102. def push_query(self, task: Task):
  103. """搜索组织单位查询结果,推送保存"""
  104. self._push_data('query', task, MGO_QUERY)
  105. def push_records(self, task: Task):
  106. """挖掘数据的记录"""
  107. if len(task['name']) > 50:
  108. task['name'] = '{:.50s}'.format(task['name'])
  109. self._push_data('records', task, MGO_RECORDS)
  110. def orgs_table(self) -> List[Mapping]:
  111. """组织|单位表"""
  112. search_orgs = []
  113. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  114. for item in cursor.sort(self.sort):
  115. search_orgs.append(item)
  116. return search_orgs
  117. def keywords_table(self):
  118. """关键词表"""
  119. search_keywords = []
  120. cursor = MGO_KEYWORDS.find(projection=self.projection)
  121. for item in cursor.sort(self.sort):
  122. search_keywords.append(item['name'])
  123. return search_keywords
  124. def seed_urls_table(self) -> List[Mapping]:
  125. """种子列表"""
  126. search_urls = []
  127. cursor = MGO_URLS.find(self.query, projection=self.projection)
  128. for item in cursor.sort(self.sort):
  129. search_urls.append(item)
  130. return search_urls
  131. def competing_goods_table(self):
  132. """竞品列表"""
  133. competing_goods = []
  134. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  135. for item in cursor.sort(self.sort):
  136. competing_goods.append(item)
  137. return competing_goods