sync_data.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicSearch
  4. from crawler.utils import is_url
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_REMOVAL_DUPLICATE
  10. )
  11. class SyncData(BasicSearch):
  12. def __init__(self, init_validator=False, loop_sync_interval=600, **kwargs):
  13. super(SyncData, self).__init__(**kwargs)
  14. self._init_validator = init_validator
  15. self._interval = loop_sync_interval
  16. self._init()
  17. def _init(self):
  18. threading.Thread(target=self.sync_data, name='SyncData').start()
  19. def sync_data_keywords(self):
  20. """同步关键词数据"""
  21. logger.info(f'[同步数据]加载关键词')
  22. words = self.seed_keywords()
  23. # 处理关键词格式并推送到任务队列
  24. words = [str(word).replace(' ', '').strip() for word in words]
  25. lst = []
  26. for word in words:
  27. lst.append(self.make_task(
  28. search=word,
  29. origin=word,
  30. groups=self.keyword_groups,
  31. classify=self.query_classify,
  32. weight=self.keyword_weight
  33. ))
  34. self.scheduler.add_query(lst, level=self.keyword_weight)
  35. logger.info(f'[同步数据]更新{len(words)}条关键词')
  36. def sync_data_orgs(self):
  37. """同步组织单位数据"""
  38. logger.info(f'[同步数据]加载单位组织数据')
  39. items = self.seed_orgs()
  40. # 处理单位组织名称并推送到任务队列
  41. orgs = []
  42. for item in items:
  43. name = item.get('name')
  44. if name in ['', None]:
  45. logger.warning(f'[异常的单位组织]{item}')
  46. continue
  47. word = str(name).replace(' ', '').strip()
  48. orgs.append(word)
  49. lst = []
  50. for word in orgs:
  51. lst.append(self.make_task(
  52. search=word,
  53. origin=word,
  54. groups=self.org_groups,
  55. classify=self.query_classify,
  56. weight=self.org_weight
  57. ))
  58. self.scheduler.add_query(lst, level=self.org_weight)
  59. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  60. for item in items:
  61. MGO_ORGS.update_one(
  62. {'_id': item['_id']},
  63. {'$set': {'enable_added': True}}
  64. )
  65. logger.info(f'[同步数据]更新{len(items)}个单位组织')
  66. def sync_data_urls(self):
  67. """同步网址数据"""
  68. logger.info(f'[同步数据]加载种子url列表')
  69. items = self.seed_urls()
  70. lst = []
  71. for item in items:
  72. if not is_url(item['name']):
  73. items.remove(item)
  74. continue
  75. exists_url = self.validator.data(item['name'])
  76. if exists_url:
  77. items.remove(item)
  78. continue
  79. lst.append(self.make_task(
  80. url=item['name'],
  81. origin=item['name'],
  82. groups=self.url_groups,
  83. classify=self.visit_classify,
  84. weight=self.url_weight
  85. ))
  86. self.scheduler.add_excavate(lst, level=self.url_weight)
  87. for item in items:
  88. MGO_URLS.update_many(
  89. {'_id': item['_id']},
  90. {'$set': {'enable_added': True}}
  91. )
  92. logger.info(f'[同步数据]更新{len(items)}条网址数据')
  93. def sync_data_competing_goods(self):
  94. """同步竞品urls"""
  95. logger.info(f'[同步数据]加载竞品url列表')
  96. items = self.seed_competing_goods()
  97. # 处理竞品urls并推送到任务队列
  98. lst = []
  99. for item in items:
  100. if not is_url(item['name']):
  101. items.remove(item)
  102. continue
  103. exists_url = self.validator.data(item['name'])
  104. if exists_url:
  105. items.remove(item)
  106. continue
  107. lst.append(self.make_task(
  108. url=item['name'],
  109. origin=item['name'],
  110. groups=self.competing_groups,
  111. classify=self.visit_classify,
  112. weight=self.url_weight
  113. ))
  114. self.scheduler.add_excavate(lst, level=self.url_weight)
  115. # 更新已推送竞品urls状态
  116. for item in items:
  117. MGO_COMPETING_GOODS.update_one(
  118. {'_id': item['_id']},
  119. {'$set': {'enable_added': True}}
  120. )
  121. logger.info(f'[同步数据]更新{len(items)}条竞品挖掘url')
  122. def sync_lua_commons(self):
  123. """同步lua采集爬虫中网址与网址名称"""
  124. logger.info(f'[同步数据]加载lua_commons数据')
  125. items = self.lua_common_domains()
  126. for item in items:
  127. MGO_REMOVAL_DUPLICATE.insert_one(item)
  128. logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
  129. def sync_loading_validator(self):
  130. """将垃圾表内容加载到过滤器"""
  131. if self._init_validator:
  132. logger.info(f'[同步数据]过滤器加载去重网址特征')
  133. count = 0
  134. cursor = MGO_REMOVAL_DUPLICATE.find(projection={'domain': 1})
  135. for item in cursor.sort(self.sort):
  136. try:
  137. domain = item['domain']
  138. if not isinstance(domain, str):
  139. MGO_REMOVAL_DUPLICATE.delete_one({'_id': item['_id']})
  140. continue
  141. except IndexError:
  142. continue
  143. if not self.validator.data(domain):
  144. self.validator.add_data(domain)
  145. count += 1
  146. logger.info(f'[同步数据]更新{count}条去重网址特征')
  147. def sync_data(self):
  148. """同步数据"""
  149. logger.info(f'[同步数据]初始化加载')
  150. while True:
  151. try:
  152. self.sync_loading_validator()
  153. self.sync_lua_commons()
  154. self.sync_data_keywords()
  155. self.sync_data_orgs()
  156. self.sync_data_competing_goods()
  157. self.sync_data_urls()
  158. except Exception as e:
  159. logger.exception(e)
  160. self.loops_interval(self._interval)