sync_data.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicSearch
  4. from crawler.utils import is_url, extract_domain
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_REMOVAL_DUPLICATE,
  10. MGO_LUA_SPIDERS
  11. )
  12. class SyncData(BasicSearch):
  13. def __init__(
  14. self,
  15. init_validator=False,
  16. init_collector=False,
  17. loop_interval=600,
  18. **kwargs
  19. ):
  20. super(SyncData, self).__init__(**kwargs)
  21. self._init_validator = init_validator
  22. self._init_collector = init_collector
  23. self._interval = loop_interval
  24. self._init()
  25. def _init(self):
  26. threading.Thread(target=self.sync_data, name='SyncData').start()
  27. def sync_data_keywords(self):
  28. """同步关键词数据"""
  29. logger.info(f'[数据同步 - 关键词]开始加载')
  30. words = self.keywords_table()
  31. # 处理关键词格式并推送到任务队列
  32. words = [str(word).replace(' ', '').strip() for word in words]
  33. lst = []
  34. for word in words:
  35. lst.append(self.make_task(
  36. search=word,
  37. origin=word,
  38. groups=self.keyword_groups,
  39. classify=self.query_classify,
  40. weight=self.keyword_weight
  41. ))
  42. self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
  43. logger.info(f'[数据同步 - 关键词]{len(words)}条')
  44. def sync_data_orgs(self):
  45. """同步组织单位数据"""
  46. logger.info(f'[数据同步 - 组织列表]开始加载')
  47. items = self.orgs_table()
  48. # 处理单位组织名称并推送到任务队列
  49. orgs = []
  50. for item in items:
  51. name = item.get('name')
  52. if name in ['', None]:
  53. logger.error(f'[数据同步 - 组织列表]组织名称错误: {item}')
  54. continue
  55. word = str(name).replace(' ', '').strip()
  56. orgs.append(word)
  57. lst = []
  58. for word in orgs:
  59. lst.append(self.make_task(
  60. search=word,
  61. origin=word,
  62. groups=self.org_groups,
  63. classify=self.query_classify,
  64. weight=self.org_weight
  65. ))
  66. self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
  67. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  68. for item in items:
  69. MGO_ORGS.update_one(
  70. {'_id': item['_id']},
  71. {'$set': {'enable_added': True}}
  72. )
  73. logger.info(f'[数据同步 - 组织列表]{len(items)}个')
  74. def sync_data_urls(self):
  75. """同步网址数据"""
  76. logger.info(f'[数据同步 - 种子列表]开始加载')
  77. items = self.seed_urls_table()
  78. lst = []
  79. for item in items:
  80. if not is_url(item['name']):
  81. items.remove(item)
  82. continue
  83. exists_url = self.validator.data(item['name'])
  84. if exists_url:
  85. items.remove(item)
  86. continue
  87. lst.append(self.make_task(
  88. url=item['name'],
  89. origin=item['name'],
  90. groups=self.url_groups,
  91. classify=self.visit_classify,
  92. weight=self.url_weight
  93. ))
  94. self.scheduler.add_excavate(lst, level=self.url_weight)
  95. for item in items:
  96. MGO_URLS.update_many(
  97. {'_id': item['_id']},
  98. {'$set': {'enable_added': True}}
  99. )
  100. logger.info(f'[数据同步 - 种子列表]{len(items)}条')
  101. def sync_data_competing_goods(self):
  102. """同步竞品urls"""
  103. logger.info(f'[数据同步 - 竞品列表]开始加载')
  104. items = self.competing_goods_table()
  105. # 处理竞品urls并推送到任务队列
  106. lst = []
  107. for item in items:
  108. if not is_url(item['name']):
  109. items.remove(item)
  110. continue
  111. exists_url = self.validator.data(item['name'])
  112. if exists_url:
  113. items.remove(item)
  114. continue
  115. lst.append(self.make_task(
  116. url=item['name'],
  117. origin=item['name'],
  118. groups=self.competing_groups,
  119. classify=self.visit_classify,
  120. weight=self.url_weight
  121. ))
  122. self.scheduler.add_excavate(lst, level=self.url_weight)
  123. # 更新已推送竞品urls状态
  124. for item in items:
  125. MGO_COMPETING_GOODS.update_one(
  126. {'_id': item['_id']},
  127. {'$set': {'enable_added': True}}
  128. )
  129. logger.info(f'[数据同步 - 竞品列表]{len(items)}条')
  130. def sync_collector(self):
  131. """同步lua已收录网址,推送url收录器"""
  132. if self._init_collector:
  133. logger.info(f'[数据同步 - 收录器]初始化加载')
  134. count = 0
  135. projection = {'param_common': 1}
  136. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  137. for item in cursor.sort(self.sort):
  138. try:
  139. url = item['param_common'][11]
  140. if not is_url(url):
  141. continue
  142. domain = extract_domain(url)
  143. except IndexError:
  144. continue
  145. if not self.collector.data(domain):
  146. self.collector.add_data(domain)
  147. count += 1
  148. logger.info(f'[数据同步 - 收录器]加载{count}个网站域名')
  149. def sync_validator(self):
  150. """将垃圾表内容加载到过滤器"""
  151. if self._init_validator:
  152. logger.info(f'[数据同步 - 过滤器]初始化加载')
  153. count = 0
  154. cursor = MGO_REMOVAL_DUPLICATE.find(projection={'domain': 1})
  155. for item in cursor.sort(self.sort):
  156. try:
  157. domain = item['domain']
  158. if not isinstance(domain, str):
  159. MGO_REMOVAL_DUPLICATE.delete_one({'_id': item['_id']})
  160. continue
  161. except IndexError:
  162. continue
  163. if not self.validator.data(domain):
  164. self.validator.add_data(domain)
  165. count += 1
  166. logger.info(f'[数据同步- 过滤器]加载{count}条去重特征')
  167. def sync_data(self):
  168. """数据同步"""
  169. logger.info(f'[数据同步]初始化加载')
  170. while True:
  171. try:
  172. self.sync_collector()
  173. self.sync_validator()
  174. self.sync_data_competing_goods()
  175. self.sync_data_keywords()
  176. self.sync_data_urls()
  177. self.sync_data_orgs()
  178. except Exception as e:
  179. logger.exception(e)
  180. self.loops_interval(self._interval)