浏览代码

任务划分group

dongzhaorui 3 年之前
父节点
当前提交
1004e0eb84
共有 1 个文件被更改,包括 26 次插入23 次删除
  1. 26 23
      find_source/crawler/schedule.py

+ 26 - 23
find_source/crawler/schedule.py

@@ -1,5 +1,6 @@
+from constants import ORGANIZATION, KEYWORD
 from crawler.q import RedisQueue
-from settings import REDIS_QUERY, REDIS_EXCAVATE
+from settings import REDIS_QUERY_KEYWORD, REDIS_EXCAVATE, REDIS_QUERY_ORGS
 
 
 class Scheduler:
@@ -7,43 +8,45 @@ class Scheduler:
     def __init__(self, queue=None):
         self.mrq = (queue or RedisQueue())
 
-    def _add_tasks(self, classify: str, tasks, level=1, **kwargs):
+    def _add_tasks(self, classify: str, tasks, level=1, group='', **kwargs):
         allow_output_log = kwargs.get('allow_output_log', True)
         if isinstance(tasks, list):
             _tasks = tasks
         else:
             _tasks = [tasks]
 
-        if classify.lower() == 'query':
-            self.mrq.push_task(
-                REDIS_QUERY,
-                _tasks,
-                level=level,
-                allow_output_log=allow_output_log
-            )
+        if classify.lower() == 'query' and group.lower() == KEYWORD:
+            redis_key = REDIS_QUERY_KEYWORD
+        elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
+            redis_key = REDIS_QUERY_ORGS
         elif classify.lower() == 'excavate':
-            self.mrq.push_task(
-                REDIS_EXCAVATE,
-                _tasks,
-                level=level,
-                allow_output_log=allow_output_log
-            )
+            redis_key = REDIS_EXCAVATE
+        else:
+            return
+
+        self.mrq.push_task(
+            redis_key,
+            _tasks,
+            level=level,
+            allow_output_log=allow_output_log
+        )
 
-    def add_query(self, tasks, level=1, **kwargs):
-        self._add_tasks('query', tasks, level, **kwargs)
+    def add_query(self, group, tasks, level=1, **kwargs):
+        self._add_tasks('query', tasks, level, group, **kwargs)
 
     def add_excavate(self, tasks, level=1, **kwargs):
         self._add_tasks('excavate', tasks, level, **kwargs)
 
-    def _get_task(self, classify: str):
-        if classify.lower() == 'query':
-            return self.mrq.pop_task([REDIS_QUERY], priority=True)
-
+    def _get_task(self, classify: str, group=''):
+        if classify.lower() == 'query' and group.lower() == KEYWORD:
+            return self.mrq.pop_task([REDIS_QUERY_KEYWORD], priority=True)
+        elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
+            return self.mrq.pop_task([REDIS_QUERY_ORGS], priority=True)
         elif classify.lower() == 'excavate':
             return self.mrq.pop_task([REDIS_EXCAVATE], priority=True)
 
     def get_excavate_task(self):
         return self._get_task('excavate')
 
-    def get_query_task(self):
-        return self._get_task('query')
+    def get_query_task(self, group):
+        return self._get_task('query', group=group)