sync_data.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicService
  4. from crawler.utils import is_url, extract_domain
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_DATA_GARBAGE,
  10. MGO_LUA_SPIDERS
  11. )
  12. class SyncData(BasicService):
  13. """数据同步服务"""
  14. def __init__(self, init_validator=False, init_collector=False, **kwargs):
  15. self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
  16. self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
  17. self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
  18. self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
  19. self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
  20. super(SyncData, self).__init__(**kwargs)
  21. self._init_validator = init_validator
  22. self._init_collector = init_collector
  23. self._allow_load_data = False
  24. def sync_keywords(self):
  25. """同步搜索词数据"""
  26. logger.info(f'[数据同步]开始加载 - 搜索词表')
  27. words = self.keywords_table()
  28. # 处理关键词格式并推送到任务队列
  29. words = [str(word).replace(' ', '').strip() for word in words]
  30. lst = []
  31. for word in words:
  32. lst.append(self.make_task(
  33. search=word,
  34. origin=word,
  35. groups=self.keyword_groups,
  36. classify=self.query_classify,
  37. weight=self.keyword_weight
  38. ))
  39. self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
  40. logger.info(f'[数据同步]任务队列读取{len(words)}条搜索词')
  41. def sync_orgs(self):
  42. """同步组织单位数据"""
  43. logger.info(f'[数据同步]开始加载 - 单位组织表')
  44. items = self.orgs_table()
  45. # 处理单位组织名称并推送到任务队列
  46. orgs = []
  47. for item in items:
  48. name = item.get('name')
  49. if name in ['', None]:
  50. logger.error(f'[数据同步 - 组织列表]组织名称错误: {item}')
  51. continue
  52. word = str(name).replace(' ', '').strip()
  53. orgs.append(word)
  54. lst = []
  55. for word in orgs:
  56. lst.append(self.make_task(
  57. search=word,
  58. origin=word,
  59. groups=self.org_groups,
  60. classify=self.query_classify,
  61. weight=self.org_weight
  62. ))
  63. self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
  64. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  65. for item in items:
  66. MGO_ORGS.update_one(
  67. {'_id': item['_id']},
  68. {'$set': {'enable_added': True}}
  69. )
  70. logger.info(f'[数据同步]任务队列读取{len(items)}家单位组织')
  71. def sync_seed_urls(self):
  72. """同步网址数据"""
  73. logger.info(f'[数据同步]开始加载 - 种子网址表')
  74. items = self.seed_urls_table()
  75. lst = []
  76. for item in items:
  77. if not is_url(item['name']):
  78. items.remove(item)
  79. continue
  80. lst.append(self.make_task(
  81. url=item['name'],
  82. origin=item['name'],
  83. groups=self.url_groups,
  84. classify=self.visit_classify,
  85. weight=self.url_weight
  86. ))
  87. self.scheduler.add_excavate(lst, level=self.url_weight)
  88. for item in items:
  89. MGO_URLS.update_many(
  90. {'_id': item['_id']},
  91. {'$set': {'enable_added': True}}
  92. )
  93. logger.info(f'[数据同步]任务队列读取{len(items)}条种子网址')
  94. def sync_competing_goods(self):
  95. """同步竞品urls"""
  96. logger.info(f'[数据同步]开始加载 - 竞品网址表')
  97. items = self.competing_goods_table()
  98. # 处理竞品urls并推送到任务队列
  99. lst = []
  100. for item in items:
  101. if not is_url(item['name']):
  102. items.remove(item)
  103. continue
  104. exists_url = self.validator.data(item['name'])
  105. if exists_url:
  106. items.remove(item)
  107. continue
  108. lst.append(self.make_task(
  109. url=item['name'],
  110. origin=item['name'],
  111. groups=self.competing_groups,
  112. classify=self.visit_classify,
  113. weight=self.url_weight
  114. ))
  115. self.scheduler.add_excavate(lst, level=self.url_weight)
  116. # 更新已推送竞品urls状态
  117. for item in items:
  118. MGO_COMPETING_GOODS.update_one(
  119. {'_id': item['_id']},
  120. {'$set': {'enable_added': True}}
  121. )
  122. logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址')
  123. def sync_collector(self):
  124. """收录器,存放新发现和已拥有的网址域名"""
  125. if self._init_collector:
  126. logger.info(f'[数据同步]开始加载 - 收录器')
  127. count = 0
  128. q = {"param_common.11": {'$exists': True}}
  129. projection = {'param_common': 1}
  130. cursor = MGO_LUA_SPIDERS.find(q, projection=projection)
  131. for item in cursor.sort(self.sort):
  132. url = item['param_common'][11]
  133. if not is_url(url):
  134. continue
  135. domain = extract_domain(url)
  136. if not self.collector.data(domain):
  137. self.collector.add_data(domain)
  138. count += 1
  139. logger.info(f'[数据同步]收录器读取{count}个网址域名')
  140. def sync_validator(self):
  141. """垃圾池:存放寻源过程中垃圾网址和没有相关信息的网址"""
  142. if self._init_validator:
  143. logger.info(f'[数据同步]开始加载 - 过滤器')
  144. count = 0
  145. cursor = MGO_DATA_GARBAGE.find(projection={'domain': 1})
  146. for item in cursor.sort(self.sort):
  147. try:
  148. domain = item['domain']
  149. if not isinstance(domain, str):
  150. MGO_DATA_GARBAGE.delete_one({'_id': item['_id']})
  151. continue
  152. except IndexError:
  153. continue
  154. if not self.validator.data(domain):
  155. self.validator.add_data(domain)
  156. count += 1
  157. logger.info(f'[数据同步]过滤器读取{count}条去重特征')
  158. def start(self):
  159. """数据同步"""
  160. def _validate():
  161. """验证模块"""
  162. while True:
  163. try:
  164. self.sync_collector()
  165. self.sync_validator()
  166. if not self._allow_load_data:
  167. self._allow_load_data = True
  168. except Exception as e:
  169. logger.exception(e)
  170. self.loops_interval(self._sync_validate_data)
  171. def _keywords():
  172. """搜索词"""
  173. while True:
  174. if self._allow_load_data:
  175. try:
  176. self.sync_keywords()
  177. self.loops_interval(self._keywords_interval)
  178. except Exception as e:
  179. logger.exception(e)
  180. else:
  181. self.loops_interval(2)
  182. def _competing_goods():
  183. """竞品列表"""
  184. while True:
  185. if self._allow_load_data:
  186. try:
  187. self.sync_competing_goods()
  188. self.loops_interval(self._competing_goods_interval)
  189. except Exception as e:
  190. logger.exception(e)
  191. else:
  192. self.loops_interval(2)
  193. def _seed_urls():
  194. """种子url"""
  195. while True:
  196. if self._allow_load_data:
  197. try:
  198. self.sync_seed_urls()
  199. self.loops_interval(self._seed_urls_interval)
  200. except Exception as e:
  201. logger.exception(e)
  202. else:
  203. self.loops_interval(2)
  204. def _orgs():
  205. """单位组织"""
  206. while True:
  207. if self._allow_load_data:
  208. try:
  209. self.sync_orgs()
  210. self.loops_interval(self._orgs_interval)
  211. except Exception as e:
  212. logger.exception(e)
  213. else:
  214. self.loops_interval(2)
  215. threading.Thread(target=_validate, name='SyncValidateData').start()
  216. threading.Thread(target=_keywords, name='SyncKeywords').start()
  217. threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
  218. threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
  219. threading.Thread(target=_orgs, name='SyncOrgs').start()