sync_data.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicSearch
  4. from crawler.utils import is_url, extract_domain
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_REMOVAL_DUPLICATE,
  10. MGO_LUA_SPIDERS
  11. )
  12. class SyncData(BasicSearch):
  13. def __init__(
  14. self,
  15. init_validator=False,
  16. init_collector=False,
  17. **kwargs
  18. ):
  19. self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
  20. self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
  21. self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
  22. self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
  23. self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
  24. super(SyncData, self).__init__(**kwargs)
  25. self._init_validator = init_validator
  26. self._init_collector = init_collector
  27. self._allow_load_data = False
  28. self._init()
  29. def _init(self):
  30. threading.Thread(target=self.sync_data, name='SyncData').start()
  31. def sync_keywords(self):
  32. """同步搜索词数据"""
  33. logger.info(f'[数据同步]开始加载 - 搜索词表')
  34. words = self.keywords_table()
  35. # 处理关键词格式并推送到任务队列
  36. words = [str(word).replace(' ', '').strip() for word in words]
  37. lst = []
  38. for word in words:
  39. lst.append(self.make_task(
  40. search=word,
  41. origin=word,
  42. groups=self.keyword_groups,
  43. classify=self.query_classify,
  44. weight=self.keyword_weight
  45. ))
  46. self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
  47. logger.info(f'[数据同步]任务队列读取{len(words)}条搜索词')
  48. def sync_orgs(self):
  49. """同步组织单位数据"""
  50. logger.info(f'[数据同步]开始加载 - 单位组织表')
  51. items = self.orgs_table()
  52. # 处理单位组织名称并推送到任务队列
  53. orgs = []
  54. for item in items:
  55. name = item.get('name')
  56. if name in ['', None]:
  57. logger.error(f'[数据同步 - 组织列表]组织名称错误: {item}')
  58. continue
  59. word = str(name).replace(' ', '').strip()
  60. orgs.append(word)
  61. lst = []
  62. for word in orgs:
  63. lst.append(self.make_task(
  64. search=word,
  65. origin=word,
  66. groups=self.org_groups,
  67. classify=self.query_classify,
  68. weight=self.org_weight
  69. ))
  70. self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
  71. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  72. for item in items:
  73. MGO_ORGS.update_one(
  74. {'_id': item['_id']},
  75. {'$set': {'enable_added': True}}
  76. )
  77. logger.info(f'[数据同步]任务队列读取{len(items)}家单位组织')
  78. def sync_seed_urls(self):
  79. """同步网址数据"""
  80. logger.info(f'[数据同步]开始加载 - 种子网址表')
  81. items = self.seed_urls_table()
  82. lst = []
  83. for item in items:
  84. if not is_url(item['name']):
  85. items.remove(item)
  86. continue
  87. exists_url = self.validator.data(item['name'])
  88. if exists_url:
  89. items.remove(item)
  90. continue
  91. lst.append(self.make_task(
  92. url=item['name'],
  93. origin=item['name'],
  94. groups=self.url_groups,
  95. classify=self.visit_classify,
  96. weight=self.url_weight
  97. ))
  98. self.scheduler.add_excavate(lst, level=self.url_weight)
  99. for item in items:
  100. MGO_URLS.update_many(
  101. {'_id': item['_id']},
  102. {'$set': {'enable_added': True}}
  103. )
  104. logger.info(f'[数据同步]任务队列读取{len(items)}条种子网址')
  105. def sync_competing_goods(self):
  106. """同步竞品urls"""
  107. logger.info(f'[数据同步]开始加载 - 竞品网址表')
  108. items = self.competing_goods_table()
  109. # 处理竞品urls并推送到任务队列
  110. lst = []
  111. for item in items:
  112. if not is_url(item['name']):
  113. items.remove(item)
  114. continue
  115. exists_url = self.validator.data(item['name'])
  116. if exists_url:
  117. items.remove(item)
  118. continue
  119. lst.append(self.make_task(
  120. url=item['name'],
  121. origin=item['name'],
  122. groups=self.competing_groups,
  123. classify=self.visit_classify,
  124. weight=self.url_weight
  125. ))
  126. self.scheduler.add_excavate(lst, level=self.url_weight)
  127. # 更新已推送竞品urls状态
  128. for item in items:
  129. MGO_COMPETING_GOODS.update_one(
  130. {'_id': item['_id']},
  131. {'$set': {'enable_added': True}}
  132. )
  133. logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址')
  134. def sync_collector(self):
  135. """同步lua已收录网址,推送url收录器"""
  136. if self._init_collector:
  137. logger.info(f'[数据同步]开始加载 - 收录器')
  138. count = 0
  139. projection = {'param_common': 1}
  140. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  141. for item in cursor.sort(self.sort):
  142. try:
  143. url = item['param_common'][11]
  144. if not is_url(url):
  145. continue
  146. domain = extract_domain(url)
  147. except IndexError:
  148. continue
  149. if not self.collector.data(domain):
  150. self.collector.add_data(domain)
  151. count += 1
  152. logger.info(f'[数据同步]收录器读取{count}个网址域名')
  153. def sync_validator(self):
  154. """将垃圾表内容加载到过滤器"""
  155. if self._init_validator:
  156. logger.info(f'[数据同步]开始加载 - 过滤器')
  157. count = 0
  158. cursor = MGO_REMOVAL_DUPLICATE.find(projection={'domain': 1})
  159. for item in cursor.sort(self.sort):
  160. try:
  161. domain = item['domain']
  162. if not isinstance(domain, str):
  163. MGO_REMOVAL_DUPLICATE.delete_one({'_id': item['_id']})
  164. continue
  165. except IndexError:
  166. continue
  167. if not self.validator.data(domain):
  168. self.validator.add_data(domain)
  169. count += 1
  170. logger.info(f'[数据同步]过滤器读取{count}条去重特征')
  171. def sync_data(self):
  172. """数据同步"""
  173. def _validate():
  174. """验证模块"""
  175. while True:
  176. try:
  177. self.sync_collector()
  178. self.sync_validator()
  179. if not self._allow_load_data:
  180. self._allow_load_data = True
  181. except Exception as e:
  182. logger.exception(e)
  183. self.loops_interval(self._sync_validate_data)
  184. def _keywords():
  185. """搜索词"""
  186. while True:
  187. if self._allow_load_data:
  188. try:
  189. self.sync_keywords()
  190. self.loops_interval(self._keywords_interval)
  191. except Exception as e:
  192. logger.exception(e)
  193. else:
  194. self.loops_interval(2)
  195. def _competing_goods():
  196. """竞品列表"""
  197. while True:
  198. if self._allow_load_data:
  199. try:
  200. self.sync_competing_goods()
  201. self.loops_interval(self._competing_goods_interval)
  202. except Exception as e:
  203. logger.exception(e)
  204. else:
  205. self.loops_interval(2)
  206. def _seed_urls():
  207. """种子url"""
  208. while True:
  209. if self._allow_load_data:
  210. try:
  211. self.sync_seed_urls()
  212. self.loops_interval(self._seed_urls_interval)
  213. except Exception as e:
  214. logger.exception(e)
  215. else:
  216. self.loops_interval(2)
  217. def _orgs():
  218. """单位组织"""
  219. while True:
  220. if self._allow_load_data:
  221. try:
  222. self.sync_orgs()
  223. self.loops_interval(self._orgs_interval)
  224. except Exception as e:
  225. logger.exception(e)
  226. else:
  227. self.loops_interval(2)
  228. logger.info(f'[数据同步]初始化加载')
  229. threading.Thread(target=_validate, name='SyncValidateData').start()
  230. threading.Thread(target=_keywords, name='SyncKeywords').start()
  231. threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
  232. threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
  233. threading.Thread(target=_orgs, name='SyncOrgs').start()