sync_data.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicService
  4. from crawler.utils import is_url, extract_domain
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_DATA_GARBAGE,
  10. MGO_LUA_SPIDERS
  11. )
  12. class SyncData(BasicService):
  13. """数据同步服务"""
  14. def __init__(
  15. self,
  16. init_validator=False,
  17. init_collector=False,
  18. **kwargs
  19. ):
  20. self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
  21. self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
  22. self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
  23. self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
  24. self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
  25. super(SyncData, self).__init__(**kwargs)
  26. self._init_validator = init_validator
  27. self._init_collector = init_collector
  28. self._allow_load_data = False
  29. def sync_keywords(self):
  30. """同步搜索词数据"""
  31. logger.info(f'[数据同步]开始加载 - 搜索词表')
  32. words = self.keywords_table()
  33. # 处理关键词格式并推送到任务队列
  34. words = [str(word).replace(' ', '').strip() for word in words]
  35. lst = []
  36. for word in words:
  37. lst.append(self.make_task(
  38. search=word,
  39. origin=word,
  40. groups=self.keyword_groups,
  41. classify=self.query_classify,
  42. weight=self.keyword_weight
  43. ))
  44. self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
  45. logger.info(f'[数据同步]任务队列读取{len(words)}条搜索词')
  46. def sync_orgs(self):
  47. """同步组织单位数据"""
  48. logger.info(f'[数据同步]开始加载 - 单位组织表')
  49. items = self.orgs_table()
  50. # 处理单位组织名称并推送到任务队列
  51. orgs = []
  52. for item in items:
  53. name = item.get('name')
  54. if name in ['', None]:
  55. logger.error(f'[数据同步 - 组织列表]组织名称错误: {item}')
  56. continue
  57. word = str(name).replace(' ', '').strip()
  58. orgs.append(word)
  59. lst = []
  60. for word in orgs:
  61. lst.append(self.make_task(
  62. search=word,
  63. origin=word,
  64. groups=self.org_groups,
  65. classify=self.query_classify,
  66. weight=self.org_weight
  67. ))
  68. self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
  69. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  70. for item in items:
  71. MGO_ORGS.update_one(
  72. {'_id': item['_id']},
  73. {'$set': {'enable_added': True}}
  74. )
  75. logger.info(f'[数据同步]任务队列读取{len(items)}家单位组织')
  76. def sync_seed_urls(self):
  77. """同步网址数据"""
  78. logger.info(f'[数据同步]开始加载 - 种子网址表')
  79. items = self.seed_urls_table()
  80. lst = []
  81. for item in items:
  82. if not is_url(item['name']):
  83. items.remove(item)
  84. continue
  85. # exists_url = self.validator.data(item['name'])
  86. # if exists_url:
  87. # items.remove(item)
  88. # continue
  89. lst.append(self.make_task(
  90. url=item['name'],
  91. origin=item['name'],
  92. groups=self.url_groups,
  93. classify=self.visit_classify,
  94. weight=self.url_weight
  95. ))
  96. self.scheduler.add_excavate(lst, level=self.url_weight)
  97. for item in items:
  98. MGO_URLS.update_many(
  99. {'_id': item['_id']},
  100. {'$set': {'enable_added': True}}
  101. )
  102. logger.info(f'[数据同步]任务队列读取{len(items)}条种子网址')
  103. def sync_competing_goods(self):
  104. """同步竞品urls"""
  105. logger.info(f'[数据同步]开始加载 - 竞品网址表')
  106. items = self.competing_goods_table()
  107. # 处理竞品urls并推送到任务队列
  108. lst = []
  109. for item in items:
  110. if not is_url(item['name']):
  111. items.remove(item)
  112. continue
  113. exists_url = self.validator.data(item['name'])
  114. if exists_url:
  115. items.remove(item)
  116. continue
  117. lst.append(self.make_task(
  118. url=item['name'],
  119. origin=item['name'],
  120. groups=self.competing_groups,
  121. classify=self.visit_classify,
  122. weight=self.url_weight
  123. ))
  124. self.scheduler.add_excavate(lst, level=self.url_weight)
  125. # 更新已推送竞品urls状态
  126. for item in items:
  127. MGO_COMPETING_GOODS.update_one(
  128. {'_id': item['_id']},
  129. {'$set': {'enable_added': True}}
  130. )
  131. logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址')
  132. def sync_collector(self):
  133. """同步lua已收录网址,推送url收录器"""
  134. if self._init_collector:
  135. logger.info(f'[数据同步]开始加载 - 收录器')
  136. count = 0
  137. q = {"param_common.11": {'$exists': True}}
  138. projection = {'param_common': 1}
  139. cursor = MGO_LUA_SPIDERS.find(q, projection=projection)
  140. for item in cursor.sort(self.sort):
  141. url = item['param_common'][11]
  142. if not is_url(url):
  143. continue
  144. domain = extract_domain(url)
  145. if not self.collector.data(domain):
  146. self.collector.add_data(domain)
  147. count += 1
  148. logger.info(f'[数据同步]收录器读取{count}个网址域名')
  149. def sync_validator(self):
  150. """将垃圾表内容加载到过滤器"""
  151. if self._init_validator:
  152. logger.info(f'[数据同步]开始加载 - 过滤器')
  153. count = 0
  154. cursor = MGO_DATA_GARBAGE.find(projection={'domain': 1})
  155. for item in cursor.sort(self.sort):
  156. try:
  157. domain = item['domain']
  158. if not isinstance(domain, str):
  159. MGO_DATA_GARBAGE.delete_one({'_id': item['_id']})
  160. continue
  161. except IndexError:
  162. continue
  163. if not self.validator.data(domain):
  164. self.validator.add_data(domain)
  165. count += 1
  166. logger.info(f'[数据同步]过滤器读取{count}条去重特征')
  167. def start(self):
  168. """数据同步"""
  169. def _validate():
  170. """验证模块"""
  171. while True:
  172. try:
  173. self.sync_collector()
  174. self.sync_validator()
  175. if not self._allow_load_data:
  176. self._allow_load_data = True
  177. except Exception as e:
  178. logger.exception(e)
  179. self.loops_interval(self._sync_validate_data)
  180. def _keywords():
  181. """搜索词"""
  182. while True:
  183. if self._allow_load_data:
  184. try:
  185. self.sync_keywords()
  186. self.loops_interval(self._keywords_interval)
  187. except Exception as e:
  188. logger.exception(e)
  189. else:
  190. self.loops_interval(2)
  191. def _competing_goods():
  192. """竞品列表"""
  193. while True:
  194. if self._allow_load_data:
  195. try:
  196. self.sync_competing_goods()
  197. self.loops_interval(self._competing_goods_interval)
  198. except Exception as e:
  199. logger.exception(e)
  200. else:
  201. self.loops_interval(2)
  202. def _seed_urls():
  203. """种子url"""
  204. while True:
  205. if self._allow_load_data:
  206. try:
  207. self.sync_seed_urls()
  208. self.loops_interval(self._seed_urls_interval)
  209. except Exception as e:
  210. logger.exception(e)
  211. else:
  212. self.loops_interval(2)
  213. def _orgs():
  214. """单位组织"""
  215. while True:
  216. if self._allow_load_data:
  217. try:
  218. self.sync_orgs()
  219. self.loops_interval(self._orgs_interval)
  220. except Exception as e:
  221. logger.exception(e)
  222. else:
  223. self.loops_interval(2)
  224. logger.info(f'[数据同步]初始化加载')
  225. threading.Thread(target=_validate, name='SyncValidateData').start()
  226. threading.Thread(target=_keywords, name='SyncKeywords').start()
  227. threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
  228. threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
  229. threading.Thread(target=_orgs, name='SyncOrgs').start()