Преглед изворни кода

添加管理任务中心登录token全局控制条件

dongzhaorui пре 7 месеци
родитељ
комит
90b0e4ae46
2 измењених фајлова са 50 додато и 23 уклоњено
  1. 45 21
      FworkSpider/feapder/core/spiders/spider.py
  2. 5 2
      FworkSpider/setting.py

+ 45 - 21
FworkSpider/feapder/core/spiders/spider.py

@@ -89,22 +89,26 @@ class Spider(BaseParser, Thread):
 
         return True
 
-    def get_task_api_token(self):
-        # 获取TOKEN
-        if self.task_api_auth_token is None:
-            token_url = f"{setting.JY_TASK_URL}/tasks/token"
-            data = {"username": "spider@py", "password": "123@qweA!"}
-            auth_params = dict(url=token_url, timeout=10, data=data, proxies=False)
-            response = Request(method="GET", **auth_params).get_response()
-            token = response.json["token"]
-            self.task_api_auth_token = token
-        log.debug(f"Apply Task api Token:{self.task_api_auth_token}")
+    def register_task_api_token(self):
+        if setting.REGISTER_TASK_TOKEN:
+            # 任务中心释放采集任务控制标识
+            self._item_buffer.release_task_enable = True
 
-    def run(self):  # 调度控制流程起始
-        self.start_callback()
+            if self.task_api_auth_token is None:
+                token_url = f"{setting.JY_TASK_URL}/tasks/token"
+                data = {"username": "spider@py", "password": "123@qweA!"}
+                auth_params = dict(url=token_url, timeout=10, data=data, proxies=False)
+                response = Request(method="GET", **auth_params).get_response(show_log=False)
+                token = response.json["token"]
+                self.task_api_auth_token = token
 
-        self._heartbeat_buffer.start()  # 启动 heartbeat_buffer
+            log.debug(f"register api token:{self.task_api_auth_token}")
 
+    def run(self):  # 调度控制流程起始
+        self.start_callback()
+        # 启动 heartbeat_buffer
+        self._heartbeat_buffer.start()
+        # 启动线程池
         for i in range(self._thread_count):
             parser_control = JySpiderParserControl(
                 memory_db=self._memory_db,
@@ -114,16 +118,16 @@ class Spider(BaseParser, Thread):
             parser_control.add_parser(self)
             parser_control.start()
             self._parser_controls.append(parser_control)
-
+        # 启动任务缓存模块
         self._item_buffer.start()
-        if self.__class__.__business_type__.endswith("Detail"):
-            self._item_buffer.release_task_enable = True  # 启用爬虫释放采集任务
-            self.get_task_api_token()  # 申请token
-
+        # 注册任务中心token
+        self.register_task_api_token()
         # 派发任务
         self.distribute_task()
         # 已派发任务加入 item_buffer 缓存容器
-        self._item_buffer.tasks_dict.update(self.tasks_dict)
+        if self._item_buffer.release_task_enable:
+            self._item_buffer.tasks_dict.update(self.tasks_dict)
+
         while True:
             try:
                 if self.all_thread_is_done():
@@ -146,7 +150,10 @@ class Spider(BaseParser, Thread):
 
             tools.delay_time(1)  # 1秒钟检查一次爬虫状态
 
-        self._item_buffer.release_tasks(self.tasks_dict, finished=False)  # 释放剩余未完成的任务
+        if self._item_buffer.release_task_enable:
+            # 释放剩余未完成的任务
+            self._item_buffer.release_tasks(self.tasks_dict, finished=False)
+
         self.end_callback()
         self._started.clear()  # 为了线程可重复start
 
@@ -225,7 +232,24 @@ class BaseBusinessDetailSpider(Spider):
         return ret
 
     get_tasks_by_rabbitmq = get_tasks
-    get_tasks_by_mongodb = get_tasks
+
+    def get_tasks_by_mongodb(self, table=None, query=None, limit=None):
+        pipeline_path = "feapder.pipelines.mongo_pipeline.TaskPipeline"
+        pipeline = tools.import_cls(pipeline_path)()
+        table = table or setting.TASK_REQUEST_PRODUCE
+        queue_name = setting.TAB_ITEMS.format(
+            redis_key=self._redis_key.replace('_detailc', '')
+        )
+        conditions = query or {
+            'state': {'$in': [1, 3]},
+            'queue_name': queue_name,
+            'update_at': {'$lt': tools.get_current_timestamp()}
+        }
+        limit = limit or setting.COLLECTOR_TASK_COUNT
+        results = pipeline.find_items(table, conditions, limit)
+        ignore = {'_id', 'state', 'update_at', 'queue_name'}
+        task_lst = [{k: v for k, v in items.items() if k not in ignore} for items in results]
+        return task_lst
 
 
 class BiddingListSpider(BaseBusinessListSpider):

+ 5 - 2
FworkSpider/setting.py

@@ -77,10 +77,13 @@ SPLASH_API = "http://splash.spdata.jianyu360.com/render.json"
 REQUEST_TIMEOUT = 60
 
 # 设置代理,代理提取API ,返回的代理分割符为\r\n
-PROXY_EXTRACT_API = "http://proxy.spdata.jianyu360.com/proxy/getallip"
 PROXY_ENABLE = True
-JY_PROXY_URL = "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch"
 JY_PROXY_AUTHOR = "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
+PROXY_EXTRACT_API = "http://proxy.spdata.jianyu360.com/proxy/getallip"
+JY_PROXY_URL = "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch"
+
+# 任务中心
+REGISTER_TASK_TOKEN = True  # 注册任务接口token
 JY_TASK_URL = "http://pytask.spdata.jianyu360.com"
 
 # item去重