sync_data.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicService
  4. from crawler.utils import is_url, extract_domain
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_DATA_GARBAGE,
  10. MGO_LUA_SPIDERS
  11. )
  12. class SyncData(BasicService):
  13. """数据同步服务"""
  14. def __init__(self, init_validator=False, init_collector=False, **kwargs):
  15. self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
  16. self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
  17. self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
  18. self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
  19. self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
  20. super(SyncData, self).__init__(**kwargs)
  21. self._init_validator = init_validator
  22. self._init_collector = init_collector
  23. self._allow_load_data = False
  24. def sync_keywords(self):
  25. """同步搜索词数据"""
  26. logger.info(f'[加载数据]搜索词表')
  27. words = self.keywords_table()
  28. # 处理关键词格式并推送到任务队列
  29. words = [str(word).replace(' ', '').strip() for word in words]
  30. lst = []
  31. for word in words:
  32. lst.append(self.make_task(
  33. search=word,
  34. origin=word,
  35. groups=self.keyword_groups,
  36. classify=self.query_classify,
  37. weight=self.keyword_weight
  38. ))
  39. self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
  40. logger.info(f'[搜索词表]任务队列加载{len(words)}条搜索词')
  41. def sync_orgs(self):
  42. """同步组织单位数据"""
  43. logger.info(f'[加载数据]单位组织表')
  44. items = self.orgs_table()
  45. # 处理单位组织名称并推送到任务队列
  46. orgs = []
  47. for item in items:
  48. name = item.get('name')
  49. if name in ['', None]:
  50. logger.error(f'[单位组织表]组织名称错误: {item}')
  51. continue
  52. word = str(name).replace(' ', '').strip()
  53. orgs.append(word)
  54. lst = []
  55. for word in orgs:
  56. lst.append(self.make_task(
  57. search=word,
  58. origin=word,
  59. groups=self.org_groups,
  60. classify=self.query_classify,
  61. weight=self.org_weight
  62. ))
  63. self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
  64. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  65. for item in items:
  66. MGO_ORGS.update_one(
  67. {'_id': item['_id']},
  68. {'$set': {'enable_added': True}}
  69. )
  70. logger.info(f'[单位组织表]任务队列加载{len(items)}家单位组织')
  71. def sync_seed_urls(self):
  72. """同步网址数据"""
  73. logger.info(f'[加载数据]种子网址表')
  74. items = self.seed_urls_table()
  75. lst = []
  76. for item in items:
  77. if not is_url(item['name']):
  78. items.remove(item)
  79. continue
  80. lst.append(self.make_task(
  81. url=item['name'],
  82. origin=item['name'],
  83. groups=self.url_groups,
  84. classify=self.visit_classify,
  85. weight=self.url_weight
  86. ))
  87. self.scheduler.add_excavate(lst, level=self.url_weight)
  88. for item in items:
  89. MGO_URLS.update_many(
  90. {'_id': item['_id']},
  91. {'$set': {'enable_added': True}}
  92. )
  93. logger.info(f'[种子网址表]任务队列加载{len(items)}条种子网址')
  94. def sync_competing_goods(self):
  95. """同步竞品urls"""
  96. logger.info(f'[加载数据]竞品网址表')
  97. items = self.competing_goods_table()
  98. # 处理竞品urls并推送到任务队列
  99. lst = []
  100. for item in items:
  101. if not is_url(item['name']):
  102. items.remove(item)
  103. continue
  104. exists_url = self.validator.data(item['name'])
  105. if exists_url:
  106. items.remove(item)
  107. continue
  108. lst.append(self.make_task(
  109. url=item['name'],
  110. origin=item['name'],
  111. groups=self.competing_groups,
  112. classify=self.visit_classify,
  113. weight=self.url_weight
  114. ))
  115. self.scheduler.add_excavate(lst, level=self.url_weight)
  116. # 更新已推送竞品urls状态
  117. for item in items:
  118. MGO_COMPETING_GOODS.update_one(
  119. {'_id': item['_id']},
  120. {'$set': {'enable_added': True}}
  121. )
  122. logger.info(f'[竞品网址表]任务队列加载{len(items)}条网址')
  123. def data_collector(self):
  124. """收录器,存放新发现和已拥有的网址域名"""
  125. if self._init_collector:
  126. logger.info(f'[加载数据]收录器')
  127. count = 0
  128. q = {"param_common.11": {'$exists': True}}
  129. projection = {'param_common': 1}
  130. cursor = MGO_LUA_SPIDERS.find(q, projection=projection)
  131. for item in cursor.sort(self.sort):
  132. url = item['param_common'][11]
  133. if not is_url(url):
  134. continue
  135. domain = extract_domain(url)
  136. logger.debug(f'[收录器]{domain}')
  137. if not self.collector.data(domain):
  138. self.collector.add_data(domain)
  139. count += 1
  140. logger.info(f'[收录器]读取{count}条网址域名')
  141. def data_validator(self):
  142. """垃圾池:存放寻源过程中垃圾网址和没有相关信息的网址"""
  143. if self._init_validator:
  144. logger.info(f'[加载数据]过滤器')
  145. count = 0
  146. cursor = MGO_DATA_GARBAGE.find(projection={'domain': 1})
  147. for item in cursor.sort(self.sort):
  148. try:
  149. domain = item['domain']
  150. if not isinstance(domain, str):
  151. MGO_DATA_GARBAGE.delete_one({'_id': item['_id']})
  152. continue
  153. except IndexError:
  154. continue
  155. logger.debug(f'[过滤器]{domain}')
  156. if not self.validator.data(domain):
  157. self.validator.add_data(domain)
  158. count += 1
  159. logger.info(f'[过滤器]读取{count}条去重特征')
  160. def start(self):
  161. """程序入口"""
  162. def _validate():
  163. """数据过滤"""
  164. while True:
  165. try:
  166. self.data_collector()
  167. self.data_validator()
  168. if not self._allow_load_data:
  169. self._allow_load_data = True
  170. except Exception as e:
  171. logger.exception(e)
  172. self.loops_interval(self._sync_validate_data)
  173. def _keywords():
  174. """搜索词"""
  175. while True:
  176. if self._allow_load_data:
  177. try:
  178. self.sync_keywords()
  179. self.loops_interval(self._keywords_interval)
  180. except Exception as e:
  181. logger.exception(e)
  182. else:
  183. self.loops_interval(2)
  184. def _competing_goods():
  185. """竞品列表"""
  186. while True:
  187. if self._allow_load_data:
  188. try:
  189. self.sync_competing_goods()
  190. self.loops_interval(self._competing_goods_interval)
  191. except Exception as e:
  192. logger.exception(e)
  193. else:
  194. self.loops_interval(2)
  195. def _seed_urls():
  196. """种子url"""
  197. while True:
  198. if self._allow_load_data:
  199. try:
  200. self.sync_seed_urls()
  201. self.loops_interval(self._seed_urls_interval)
  202. except Exception as e:
  203. logger.exception(e)
  204. else:
  205. self.loops_interval(2)
  206. def _orgs():
  207. """单位组织"""
  208. while True:
  209. if self._allow_load_data:
  210. try:
  211. self.sync_orgs()
  212. self.loops_interval(self._orgs_interval)
  213. except Exception as e:
  214. logger.exception(e)
  215. else:
  216. self.loops_interval(2)
  217. threading.Thread(target=_validate, name='SyncValidateData').start()
  218. threading.Thread(target=_keywords, name='SyncKeywords').start()
  219. threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
  220. threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
  221. threading.Thread(target=_orgs, name='SyncOrgs').start()