Kaynağa Gözat

添加采集任务与状态管理

dongzhaorui 1 yıl önce
ebeveyn
işleme
93ce1064c9
1 değiştirilmiş dosya ile 130 ekleme ve 148 silme
  1. 130 148
      FworkSpider/feapder/core/parser_control.py

+ 130 - 148
FworkSpider/feapder/core/parser_control.py

@@ -17,7 +17,7 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.buffer.item_buffer import ItemBuffer
 from feapder.db.memory_db import MemoryDB
-from feapder.network.item import Item
+from feapder.network.item import Item, BaseListItem
 from feapder.network.request import Request
 from feapder.utils import metrics
 from feapder.utils.log import log
@@ -35,118 +35,16 @@ class PaserControl(threading.Thread):
     _success_task_count = 0
     _failed_task_count = 0
 
-    def __init__(self, collector, redis_key, request_buffer, item_buffer):
+    def __init__(self, collector, redis_key, request_buffer, item_buffer, task_buffer):
         super(PaserControl, self).__init__()
         self._parsers = []
         self._collector = collector
         self._redis_key = redis_key
         self._request_buffer = request_buffer
         self._item_buffer = item_buffer
+        self._task_buffer = task_buffer
         self._thread_stop = False
 
-    def is_filter(self, item):
-        """item入库前是否会被过滤"""
-        if setting.ITEM_FILTER_ENABLE:
-            if self._item_buffer.__class__.dedup.get(item.fingerprint):
-                return True
-        return False
-
-    def sent_heartbeat(self, items, table=None):
-        """发送心跳数据"""
-        send_success = True
-        is_list = isinstance(items, list)
-        items = items if is_list else [items]
-        log.debug("发送心跳")
-        table = table or setting.RECORD_SPIDER_HEARTBEAT
-        if not self._item_buffer.export_to_db(table, items):
-            send_success = False
-            log.error("失败心跳:\n {}".format(tools.dumps_json(items)))
-        return send_success
-
-    @staticmethod
-    def get_spider_attribute(name, *args):
-        """获取对象属性"""
-        obj1, obj2 = args or (None, None)
-
-        val = None
-        if obj1 is not None:
-            if isinstance(obj1, dict):
-                val = obj1.get(name)
-                if not val and name == "spidercode":
-                    val = obj1.get("code")
-            else:
-                val = getattr(obj1, name, None)
-
-        if not val and obj2 is not None:
-            val = getattr(obj2, name, None)
-
-        return val if val is not None else ""
-
-    def spider_heartbeat(self, request, response, **kwargs):
-        """爬虫心跳"""
-        parser = kwargs["parser"]
-        now_page = kwargs["now_page"]
-        extract_count = kwargs["extract_count"]
-        task_count = kwargs["task_count"]
-        rel_count = kwargs["rel_count"]
-        filepath = kwargs["filepath"]
-        status_code = getattr(response, "status_code", -1)
-
-        item = getattr(request, "item", {})
-        site = self.get_spider_attribute("site", item, parser)
-        channel = self.get_spider_attribute("channel", item, parser)
-        code = self.get_spider_attribute("spidercode", item, parser)
-        business_type: str = parser.__business_type__  # 爬虫业务类型
-        run_time = tools.get_current_date(date_format="%Y-%m-%d")  # 运行时间,单位:天
-        spider_id = tools.get_md5(code + business_type + run_time)
-        heartbeat_content = dict(
-            node_ip=tools.os.environ.get("CRAWLAB_SERVER_REGISTER_IP"),  # crawlab节点名称
-            crawlab_taskid=tools.os.environ.get("CRAWLAB_TASK_ID"),  # crawlab平台爬虫的任务id
-            site=site,
-            channel=channel,
-            spidercode=code,
-            url=request.url,  # 访问地址
-            status_code=status_code,  # 响应状态码
-            runtime=run_time,
-            business_type=business_type,
-            spider_id=spider_id,
-            filepath=filepath,  # 文件路径
-            create_at=tools.ensure_int64(tools.get_current_timestamp()),  # 执行时间, 单位:秒
-        )
-
-        if hasattr(request, "error_msg") and status_code != 200:
-            error = getattr(request, "error_msg")
-            feature = dict(
-                err_type=str(error.split(": ")[0]),
-                err_msg=getattr(request, "error_msg"),
-            )
-            feature.setdefault("request_success", False)
-            if business_type.endswith("List"):
-                feature.update(dict(nowpage=now_page, ))
-            else:
-                feature.update(dict(count=task_count, ))
-        else:
-            if business_type.endswith("List"):
-                # 列表页
-                list_feature = dict(
-                    nowpage=now_page,  # 当前页码
-                    count=extract_count,  # 列表提取总数
-                    rel_count=rel_count,  # 实际入库总数
-                )
-                feature = list_feature
-            else:
-                # 详情页
-                detail_feature = dict(
-                    count=task_count,  # 发起请求的总数
-                    rel_count=rel_count,  # 实际入库总数
-                )
-                feature = detail_feature
-            feature.setdefault("request_success", True)
-
-        feature['expire_at'] = tools.get_utcnow()  # 设置utc时间,定期删除(5天)
-        heartbeat_content.update(feature)
-        return self.sent_heartbeat(heartbeat_content)
-
     def run(self):
         self._thread_stop = False
         while not self._thread_stop:
@@ -160,7 +58,6 @@ class PaserControl(threading.Thread):
 
                 self.is_show_tip = False
                 self.deal_request(request)
-
             except (Exception, BaseException) as e:
                 log.exception(e)
 
@@ -176,14 +73,14 @@ class PaserControl(threading.Thread):
         request_redis = request["request_redis"]
         request = request["request_obj"]
 
-        is_sent_heartbeat = False  # 发送心跳的条件
+        is_sent_heartbeat = False  # 发送心跳的标识
         heartbeat_lst = []  # 待推送的心跳信息列表
         for parser in self._parsers:
             now_page = getattr(request, "page", -1)  # 当前访问页码
             extract_count = 0  # 列表抽取总数量
             task_count = 0  # 详情任务总数量
             rel_count = 0  # 实际入库量
-            is_mixed = False  # 混合采集标识 False 列表页与详情页单独采集 True 列表页即详情页
+            is_mixed = False  # 混合采集标识,False=列表页与详情页单独采集;True=列表页即详情页
 
             if parser.name == request.parser_name:
                 used_download_midware_enable = False
@@ -255,8 +152,7 @@ class PaserControl(threading.Thread):
                                 )
                             )
 
-                        # 详情任务数量计数
-                        task_count += 1
+                        task_count += 1  # 详情任务数量计数
 
                         if response == None:
                             raise Exception(
@@ -308,28 +204,27 @@ class PaserControl(threading.Thread):
                                 self._request_buffer.put_request(result)
 
                         elif isinstance(result, Item):
-
-                            # 判断是否是 混合采集
-                            try:
-                                if "List" in parser.__business_type__ and result.contenthtml:
-                                    is_mixed = True
-                                result.is_mixed = is_mixed
-                            except:
-                                if "PlanToBuildList" in parser.__business_type__:
-                                    result.is_mixed = is_mixed
-                                else:
-                                    result.item['is_mixed'] = is_mixed
-
                             result_type = 2
 
+                            # 判断爬虫是否混合采集
+                            if "List" in parser.__business_type__ and hasattr(result, 'contenthtml'):
+                                is_mixed = True
+                            result.is_mixed = is_mixed
+
                             # 实际入库数量计数
-                            if not self.is_filter(result):
+                            if not self.is_duplicate(result):
                                 rel_count += 1
-                            # 实际入库数量(无限翻页控制条件)
-                            request.rel_count = rel_count
+
+                            request.rel_count = rel_count  # 实际入库数量
+
+                            # 添加属性 - 待采集任务队列名称(仅对详情采集添加队列名称,以便任务发布)
+                            if isinstance(result, BaseListItem):
+                                result.queue_name = self._task_buffer._tab_items
 
                             # 将item入库(异步)
                             self._item_buffer.put_item(result)
+                            # 推送任务到待采集队列(异步)
+                            self._task_buffer.put_task(result)
 
                         elif callable(result):  # result为可执行的无参函数
                             if result_type == 2:  # item 的 callback,buffer里的item均入库后再执行
@@ -411,9 +306,7 @@ class PaserControl(threading.Thread):
 
                     request.rel_count = 0  # 重置实际入库数
 
-                    requests = parser.exception_request(request, response) or [
-                        request
-                    ]
+                    requests = parser.exception_request(request, response) or [request]
                     if not isinstance(requests, Iterable):
                         raise Exception(
                             "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
@@ -426,16 +319,11 @@ class PaserControl(threading.Thread):
                         if not isinstance(request, Request):
                             raise Exception("exception_request 需 yield request")
 
-                        if (
-                            request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
-                            or request.is_abandoned
-                        ):
+                        if request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES or request.is_abandoned:
                             self.__class__._failed_task_count += 1  # 记录失败任务数
 
                             # 处理failed_request的返回值 request 或 func
-                            results = parser.failed_request(request, response) or [
-                                request
-                            ]
+                            results = parser.failed_request(request, response) or [request]
                             if not isinstance(results, Iterable):
                                 raise Exception(
                                     "%s.%s返回值必须可迭代"
@@ -454,20 +342,11 @@ class PaserControl(threading.Thread):
                                                 if request_redis
                                                 else result
                                             )
-                                            original_request.error_msg = (
-                                                request.error_msg
-                                            )
-                                            original_request.response = (
-                                                request.response
-                                            )
-
-                                            self._request_buffer.put_failed_request(
-                                                original_request
-                                            )
+                                            original_request.error_msg = request.error_msg
+                                            original_request.response = request.response
+                                            self._request_buffer.put_failed_request(original_request)
                                         else:
-                                            self._request_buffer.put_failed_request(
-                                                result
-                                            )
+                                            self._request_buffer.put_failed_request(result)
 
                                 elif callable(result):
                                     self._request_buffer.put_request(result)
@@ -577,6 +456,109 @@ class PaserControl(threading.Thread):
     def add_parser(self, parser):
         self._parsers.append(parser)
 
+    def is_duplicate(self, item):
+        """item入库前是否会被过滤"""
+        if setting.ITEM_FILTER_ENABLE:
+            if self._item_buffer.__class__.dedup.get(item.fingerprint):
+                return True
+        return False
+
+    def sent_heartbeat(self, items, table=None):
+        """发送汇总采集详情"""
+        send_success = True
+        items = items if isinstance(items, list) else [items]
+        log.debug("发送心跳")
+        table = table or setting.SPIDER_HEARTBEAT_RECORD
+        if not self._item_buffer.export_to_db(table, items):
+            send_success = False
+            log.error("失败心跳:\n {}".format(tools.dumps_json(items)))
+        return send_success
+
+    @staticmethod
+    def get_spider_attribute(name, *args):
+        """获取对象属性"""
+        arg1, arg2 = args or (None, None)
+
+        val = None
+        if arg1 is not None:
+            if isinstance(arg1, dict):
+                val = arg1.get(name)
+                if not val and name == "spidercode":
+                    val = arg1.get("code")
+            else:
+                val = getattr(arg1, name, None)
+
+        if not val and arg2 is not None:
+            val = getattr(arg2, name, None)
+
+        return val if val is not None else ""
+
+    def spider_heartbeat(self, request, response, **kwargs):
+        """爬虫心跳"""
+        parser = kwargs["parser"]
+        now_page = kwargs["now_page"]
+        extract_count = kwargs["extract_count"]
+        task_count = kwargs["task_count"]
+        rel_count = kwargs["rel_count"]
+        filepath = kwargs["filepath"]
+        status_code = getattr(response, "status_code", -1)
+
+        item = getattr(request, "list_info", {})
+        site = self.get_spider_attribute("site", item, parser)
+        channel = self.get_spider_attribute("channel", item, parser)
+        code = self.get_spider_attribute("spidercode", item, parser)
+
+        business_type: str = parser.__business_type__  # 爬虫业务类型
+        run_time = tools.get_current_date(date_format="%Y-%m-%d")  # 运行时间,单位:天
+        spider_id = tools.get_md5(code + business_type + run_time)
+        heartbeat_content = dict(
+            node_ip=tools.os.environ.get("CRAWLAB_SERVER_REGISTER_IP"),  # crawlab节点名称
+            crawlab_taskid=tools.os.environ.get("CRAWLAB_TASK_ID"),  # crawlab平台爬虫的任务id
+            site=site,
+            channel=channel,
+            spidercode=code,
+            url=request.url,  # 访问地址
+            status_code=status_code,  # 响应状态码
+            runtime=run_time,
+            business_type=business_type,
+            spider_id=spider_id,
+            filepath=filepath,  # 文件路径
+            create_at=tools.ensure_int64(tools.get_current_timestamp()),  # 执行时间, 单位:秒
+        )
+
+        if hasattr(request, "error_msg") and status_code != 200:
+            error = getattr(request, "error_msg")
+            feature = dict(
+                err_type=str(error.split(": ")[0]),
+                err_msg=getattr(request, "error_msg"),
+            )
+            feature.setdefault("request_success", False)
+            if business_type.endswith("List"):
+                feature.update(dict(nowpage=now_page, ))
+            else:
+                feature.update(dict(count=task_count, ))
+        else:
+            if business_type.endswith("List"):
+                # 列表页
+                list_feature = dict(
+                    nowpage=now_page,  # 当前页码
+                    count=extract_count,  # 列表提取总数
+                    rel_count=rel_count,  # 实际入库总数
+                )
+                feature = list_feature
+            else:
+                # 详情页
+                detail_feature = dict(
+                    count=task_count,  # 发起请求的总数
+                    rel_count=rel_count,  # 实际入库总数
+                )
+                feature = detail_feature
+            feature.setdefault("request_success", True)
+
+        feature['expire_at'] = tools.get_utcnow()  # 设置utc时间,定期删除(5天)
+        heartbeat_content.update(feature)
+        return self.sent_heartbeat(heartbeat_content)
+
 
 class AirSpiderParserControl(PaserControl):
     is_show_tip = False