dongzhaorui 3 ani în urmă
părinte
comite
a1ee22fa5a
1 a modificat fișierele cu 85 adăugiri și 26 ștergeri
  1. 85 26
      find_source/crawler/services/sync_data.py

+ 85 - 26
find_source/crawler/services/sync_data.py

@@ -18,21 +18,25 @@ class SyncData(BasicSearch):
             self,
             init_validator=False,
             init_collector=False,
-            loop_interval=600,
             **kwargs
     ):
+        self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
+        self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
+        self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
+        self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
+        self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
         super(SyncData, self).__init__(**kwargs)
         self._init_validator = init_validator
         self._init_collector = init_collector
-        self._interval = loop_interval
+        self._allow_load_data = False
         self._init()
 
     def _init(self):
         threading.Thread(target=self.sync_data, name='SyncData').start()
 
     def sync_keywords(self):
-        """同步关键词数据"""
-        logger.info(f'[数据同步 - 关键词]开始加载')
+        """同步搜索词数据"""
+        logger.info(f'[数据同步]开始加载 - 搜索词表')
         words = self.keywords_table()
         # 处理关键词格式并推送到任务队列
         words = [str(word).replace(' ', '').strip() for word in words]
@@ -46,11 +50,11 @@ class SyncData(BasicSearch):
                 weight=self.keyword_weight
             ))
         self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
-        logger.info(f'[数据同步 - 关键词]{len(words)}条')
+        logger.info(f'[数据同步]任务队列读取{len(words)}条搜索词')
 
     def sync_orgs(self):
         """同步组织单位数据"""
-        logger.info(f'[数据同步 - 组织]开始加载')
+        logger.info(f'[数据同步]开始加载 - 单位组织表')
         items = self.orgs_table()
         # 处理单位组织名称并推送到任务队列
         orgs = []
@@ -77,11 +81,11 @@ class SyncData(BasicSearch):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[数据同步 - 组织列表]{len(items)}个')
+        logger.info(f'[数据同步]任务队列读取{len(items)}家单位组织')
 
     def sync_seed_urls(self):
         """同步网址数据"""
-        logger.info(f'[数据同步 - 种子列表]开始加载')
+        logger.info(f'[数据同步]开始加载 - 种子网址表')
         items = self.seed_urls_table()
         lst = []
         for item in items:
@@ -105,11 +109,11 @@ class SyncData(BasicSearch):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[数据同步 - 种子列表]{len(items)}条')
+        logger.info(f'[数据同步]任务队列读取{len(items)}条种子网址')
 
     def sync_competing_goods(self):
         """同步竞品urls"""
-        logger.info(f'[数据同步 - 竞品列表]开始加载')
+        logger.info(f'[数据同步]开始加载 - 竞品网址表')
         items = self.competing_goods_table()
         # 处理竞品urls并推送到任务队列
         lst = []
@@ -135,12 +139,12 @@ class SyncData(BasicSearch):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[数据同步 - 竞品列表]{len(items)}条')
+        logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址')
 
     def sync_collector(self):
         """同步lua已收录网址,推送url收录器"""
         if self._init_collector:
-            logger.info(f'[数据同步 - 收录器]初始化加载')
+            logger.info(f'[数据同步]开始加载 - 收录器')
             count = 0
             projection = {'param_common': 1}
             cursor = MGO_LUA_SPIDERS.find(projection=projection)
@@ -155,12 +159,12 @@ class SyncData(BasicSearch):
                 if not self.collector.data(domain):
                     self.collector.add_data(domain)
                     count += 1
-            logger.info(f'[数据同步 - 收录器]加载{count}个网站域名')
+            logger.info(f'[数据同步]收录器读取{count}个网址域名')
 
     def sync_validator(self):
         """将垃圾表内容加载到过滤器"""
         if self._init_validator:
-            logger.info(f'[数据同步 - 过滤器]初始化加载')
+            logger.info(f'[数据同步]开始加载 - 过滤器')
             count = 0
             cursor = MGO_REMOVAL_DUPLICATE.find(projection={'domain': 1})
             for item in cursor.sort(self.sort):
@@ -174,19 +178,74 @@ class SyncData(BasicSearch):
                 if not self.validator.data(domain):
                     self.validator.add_data(domain)
                     count += 1
-            logger.info(f'[数据同步- 过滤器]加载{count}条去重特征')
+            logger.info(f'[数据同步]过滤器读取{count}条去重特征')
 
     def sync_data(self):
         """数据同步"""
+
+        def _validate():
+            """验证模块"""
+            while True:
+                try:
+                    self.sync_collector()
+                    self.sync_validator()
+                    if not self._allow_load_data:
+                        self._allow_load_data = True
+                except Exception as e:
+                    logger.exception(e)
+                self.loops_interval(self._sync_validate_data)
+
+        def _keywords():
+            """搜索词"""
+            while True:
+                if self._allow_load_data:
+                    try:
+                        self.sync_keywords()
+                        self.loops_interval(self._keywords_interval)
+                    except Exception as e:
+                        logger.exception(e)
+                else:
+                    self.loops_interval(2)
+
+        def _competing_goods():
+            """竞品列表"""
+            while True:
+                if self._allow_load_data:
+                    try:
+                        self.sync_competing_goods()
+                        self.loops_interval(self._competing_goods_interval)
+                    except Exception as e:
+                        logger.exception(e)
+                else:
+                    self.loops_interval(2)
+
+        def _seed_urls():
+            """种子url"""
+            while True:
+                if self._allow_load_data:
+                    try:
+                        self.sync_seed_urls()
+                        self.loops_interval(self._seed_urls_interval)
+                    except Exception as e:
+                        logger.exception(e)
+                else:
+                    self.loops_interval(2)
+
+        def _orgs():
+            """单位组织"""
+            while True:
+                if self._allow_load_data:
+                    try:
+                        self.sync_orgs()
+                        self.loops_interval(self._orgs_interval)
+                    except Exception as e:
+                        logger.exception(e)
+                else:
+                    self.loops_interval(2)
+
         logger.info(f'[数据同步]初始化加载')
-        while True:
-            try:
-                self.sync_collector()
-                self.sync_validator()
-                self.sync_competing_goods()
-                self.sync_keywords()
-                self.sync_seed_urls()
-                self.sync_orgs()
-            except Exception as e:
-                logger.exception(e)
-            self.loops_interval(self._interval)
+        threading.Thread(target=_validate, name='SyncValidateData').start()
+        threading.Thread(target=_keywords, name='SyncKeywords').start()
+        threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
+        threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
+        threading.Thread(target=_orgs, name='SyncOrgs').start()