sync_data.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import threading
  2. from common.log import logger
  3. from crawler.services.basics import BasicService
  4. from crawler.utils import is_url, extract_domain
  5. from settings import (
  6. MGO_URLS,
  7. MGO_ORGS,
  8. MGO_COMPETING_GOODS,
  9. MGO_GARBAGE,
  10. MGO_LUA_SPIDERS
  11. )
  12. class SyncData(BasicService):
  13. """数据同步服务"""
  14. def __init__(self, init_validator=False, init_collector=False, **kwargs):
  15. self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
  16. self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
  17. self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
  18. self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
  19. self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
  20. super(SyncData, self).__init__(**kwargs)
  21. self._init_validator = init_validator
  22. self._init_collector = init_collector
  23. self._allow_load_data = False
  24. def sync_keywords(self):
  25. """同步搜索词数据"""
  26. words = self.keywords_table()
  27. # 处理关键词格式并推送到任务队列
  28. words = [str(word).replace(' ', '').strip() for word in words]
  29. lst = []
  30. for word in words:
  31. lst.append(self.make_task(
  32. search=word,
  33. origin=word,
  34. groups=self.keyword_groups,
  35. classify=self.query_classify,
  36. weight=self.keyword_weight
  37. ))
  38. self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
  39. logger.info(f'[搜索词表]任务队列加载{len(words)}条搜索词')
  40. def sync_orgs(self):
  41. """同步组织单位数据"""
  42. items = self.orgs_table()
  43. # 处理单位组织名称并推送到任务队列
  44. orgs = []
  45. for item in items:
  46. name = item.get('name')
  47. if name in ['', None]:
  48. logger.error(f'[单位组织表]组织名称错误: {item}')
  49. continue
  50. word = str(name).replace(' ', '').strip()
  51. orgs.append(word)
  52. lst = []
  53. for word in orgs:
  54. lst.append(self.make_task(
  55. search=word,
  56. origin=word,
  57. groups=self.org_groups,
  58. classify=self.query_classify,
  59. weight=self.org_weight
  60. ))
  61. self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
  62. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  63. for item in items:
  64. MGO_ORGS.update_one(
  65. {'_id': item['_id']},
  66. {'$set': {'enable_added': True}}
  67. )
  68. logger.info(f'[单位网址查询]任务队列加载{len(items)}家单位组织')
  69. def sync_seed_urls(self):
  70. """同步网址数据"""
  71. items = self.seed_urls_table()
  72. lst = []
  73. for item in items:
  74. if not is_url(item['name']):
  75. items.remove(item)
  76. continue
  77. lst.append(self.make_task(
  78. url=item['name'],
  79. origin=item['name'],
  80. groups=self.url_groups,
  81. classify=self.visit_classify,
  82. weight=self.url_weight
  83. ))
  84. self.scheduler.add_excavate(lst, level=self.url_weight)
  85. for item in items:
  86. MGO_URLS.update_many(
  87. {'_id': item['_id']},
  88. {'$set': {'enable_added': True}}
  89. )
  90. logger.info(f'[种子寻源]任务队列加载{len(items)}条种子网址')
  91. def sync_competing_goods(self):
  92. """同步竞品urls"""
  93. items = self.competing_goods_table()
  94. # 处理竞品urls并推送到任务队列
  95. lst = []
  96. for item in items:
  97. if not is_url(item['name']):
  98. items.remove(item)
  99. continue
  100. exists_url = self.validator.data(item['name'])
  101. if exists_url:
  102. items.remove(item)
  103. continue
  104. lst.append(self.make_task(
  105. url=item['name'],
  106. origin=item['name'],
  107. groups=self.competing_groups,
  108. classify=self.visit_classify,
  109. weight=self.url_weight
  110. ))
  111. self.scheduler.add_excavate(lst, level=self.url_weight)
  112. # 更新已推送竞品urls状态
  113. for item in items:
  114. MGO_COMPETING_GOODS.update_one(
  115. {'_id': item['_id']},
  116. {'$set': {'enable_added': True}}
  117. )
  118. logger.info(f'[竞品寻源]任务队列加载{len(items)}条网址')
  119. def data_collector(self):
  120. """收录器 - 存放新发现和已拥有的网址域名"""
  121. if self._init_collector:
  122. domains = []
  123. q = {"param_common.11": {'$exists': True}}
  124. projection = {'param_common': 1}
  125. cursor = MGO_LUA_SPIDERS.find(q, projection=projection)
  126. for item in cursor.sort(self.sort):
  127. url = item['param_common'][11]
  128. if not is_url(url):
  129. continue
  130. domain = extract_domain(url)
  131. logger.debug(f'[收录器]拉取收录域名特征:{domain}')
  132. if domain not in domains:
  133. domains.append(domain)
  134. count = 0
  135. for domain in domains:
  136. logger.debug(f'[收录器]更新收录域名特征:{domain}')
  137. if not self.collector.data(domain):
  138. logger.debug(f'[收录器]添加收录域名特征:{domain}')
  139. self.collector.add_data(domain)
  140. count += 1
  141. logger.info(f'[收录器]加载收录网址特征{count}条')
  142. def data_validator(self):
  143. """过滤器 - 存放寻源过程中垃圾网址和没有招投标相关信息的网站"""
  144. if self._init_validator:
  145. count = 0
  146. q = {
  147. "source": {"$exists": False}, # 来源
  148. "domain": {"$type": "string"}
  149. }
  150. cursor = MGO_GARBAGE.find(q, projection={'domain': 1})
  151. for item in cursor.sort(self.sort):
  152. domain = item['domain']
  153. logger.debug(f'[过滤器]拉取过滤网址特征:{domain}')
  154. if not self.validator.data(domain):
  155. logger.debug(f'[过滤器]更新过滤网址特征:{domain}')
  156. self.validator.add_data(domain)
  157. MGO_GARBAGE.update_one(
  158. {'_id': item['_id']},
  159. {'$set': {"source": "other"}}
  160. )
  161. count += 1
  162. logger.info(f'[过滤器]加载去重特征{count}条')
  163. def start(self):
  164. """程序入口"""
  165. def _validate():
  166. """数据过滤"""
  167. logger.info('[自动寻源]加载任务过滤模块')
  168. while True:
  169. try:
  170. self.data_collector()
  171. self.data_validator()
  172. if not self._allow_load_data:
  173. self._allow_load_data = True
  174. except Exception as e:
  175. logger.exception(e)
  176. self.loops_interval(self._sync_validate_data)
  177. def _keywords():
  178. """搜索词"""
  179. logger.info('[自动寻源]加载搜索词模块')
  180. while True:
  181. if self._allow_load_data:
  182. try:
  183. self.sync_keywords()
  184. self.loops_interval(self._keywords_interval)
  185. except Exception as e:
  186. logger.exception(e)
  187. else:
  188. self.loops_interval(2)
  189. def _competing_goods():
  190. """竞品列表"""
  191. logger.info('[自动寻源]加载竞品寻源模块')
  192. while True:
  193. if self._allow_load_data:
  194. try:
  195. self.sync_competing_goods()
  196. self.loops_interval(self._competing_goods_interval)
  197. except Exception as e:
  198. logger.exception(e)
  199. else:
  200. self.loops_interval(2)
  201. def _seed_urls():
  202. """种子url"""
  203. logger.info('[自动寻源]加载种子寻源模块')
  204. while True:
  205. if self._allow_load_data:
  206. try:
  207. self.sync_seed_urls()
  208. self.loops_interval(self._seed_urls_interval)
  209. except Exception as e:
  210. logger.exception(e)
  211. else:
  212. self.loops_interval(2)
  213. def _orgs():
  214. """单位组织"""
  215. logger.info('[自动寻源]加载单位网址查询模块')
  216. while True:
  217. if self._allow_load_data:
  218. try:
  219. self.sync_orgs()
  220. self.loops_interval(self._orgs_interval)
  221. except Exception as e:
  222. logger.exception(e)
  223. else:
  224. self.loops_interval(2)
  225. threading.Thread(target=_validate, name='SyncValidateData').start()
  226. threading.Thread(target=_keywords, name='SyncKeywords').start()
  227. threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
  228. threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
  229. threading.Thread(target=_orgs, name='SyncOrgs').start()