Pārlūkot izejas kodu

rabbitmq任务调用切换接口调用,分布式队列切换内存队列

dongzhaorui 11 mēneši atpakaļ
vecāks
revīzija
05b5c73368

+ 2 - 2
FworkSpider/feapder/buffer/__init__.py

@@ -1,9 +1,9 @@
 # -*- coding: utf-8 -*-
-'''
+"""
 Created on 2020/4/23 12:09 AM
 ---------
 @summary:
 ---------
 @author: Boris
 @email: boris_liu@foxmail.com
-'''
+"""

+ 10 - 0
FworkSpider/feapder/buffer/item_buffer/__init__.py

@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-08-29 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+from feapder.buffer.item_buffer.item_buffer import ItemBuffer
+from feapder.buffer.item_buffer.jy_item_buffer import JyItemBuffer

+ 3 - 0
FworkSpider/feapder/buffer/item_buffer.py → FworkSpider/feapder/buffer/item_buffer/item_buffer.py

@@ -304,6 +304,9 @@ class ItemBuffer(threading.Thread):
                     )
                     return False
 
+                # TODO 上报任务中心,采集完成的结果
+                self.report_result(datas)
+
         self.metric_datas(table=table, datas=datas)
         return True
 

+ 452 - 0
FworkSpider/feapder/buffer/item_buffer/jy_item_buffer.py

@@ -0,0 +1,452 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2018-06-19 17:17
+---------
+@summary: 剑鱼数据缓存
+---------
+@author:
+@email:
+"""
+
+import threading
+from queue import Queue
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.dedup import Dedup
+from feapder.network.item import (
+    Item,
+    UpdateItem,
+    BaseListItem,
+    BaseDetailItem,
+    FailedTaskItem,
+)
+from feapder.network.request import Request
+from feapder.pipelines import BasePipeline
+from feapder.utils.log import log
+
+MAX_ITEM_COUNT = 5000  # 缓存中最大item数
+UPLOAD_BATCH_MAX_SIZE = 1000
+
+
+class JyItemBuffer(threading.Thread):
+    dedup = None
+
+    def __init__(self, redis_key, rabbitmq=None):
+        if not hasattr(self, "_items_queue"):
+            super(JyItemBuffer, self).__init__()
+
+            self._thread_stop = False
+            self._is_adding_to_db = False
+            self._redis_key = redis_key
+
+            self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
+
+            # 采集任务队列(rabbitMq)
+            self._tab_items = setting.TAB_ITEMS.format(
+                redis_key=redis_key.replace("_detailc", "")
+            )
+
+            self._item_tables = {
+                # 'item_name': 'table_name' # 缓存item名与表名对应关系
+            }
+
+            self._item_update_keys = {
+                # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
+            }
+
+            self._pipelines = self.load_pipelines()
+            if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
+                self.__class__.dedup = Dedup(
+                    to_md5=False, **setting.ITEM_FILTER_SETTING
+                )
+
+            # 导出重试的次数
+            self.export_retry_times = 0
+            # 导出失败的次数
+            self.export_falied_times = 0
+            # 缓存队列
+            self.tasks_dict = {}
+            self.release_task_enable = False
+
+    def load_pipelines(self):
+        pipelines = []
+        for pipeline_path in setting.ITEM_PIPELINES:
+            pipeline = tools.import_cls(pipeline_path)()
+            if not isinstance(pipeline, BasePipeline):
+                raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
+            pipelines.append(pipeline)
+
+        return pipelines
+
+    def run(self):
+        self._thread_stop = False
+        while not self._thread_stop:
+            self.flush()
+            tools.delay_time(1)
+
+        self.close()
+
+    def stop(self):
+        self._thread_stop = True
+        self._started.clear()
+
+    def put_item(self, item):
+        if isinstance(item, Item):
+            # 入库前的回调
+            item.pre_to_db()
+
+        self._items_queue.put(item)
+
+    def flush(self):
+        try:
+            items = []
+            update_items = []
+            failed_task_items = []
+            requests = []
+            callbacks = []
+            items_fingerprints = []
+            data_count = 0
+
+            while not self._items_queue.empty():
+                data = self._items_queue.get_nowait()
+                data_count += 1
+                update_at = tools.ensure_int64(tools.get_current_timestamp())
+
+                # data 分类
+                if callable(data):
+                    callbacks.append(data)
+
+                elif isinstance(data, UpdateItem):
+                    update_items.append(data)
+
+                elif isinstance(data, FailedTaskItem):
+                    data.queue_name = self._tab_items  # 采集任务队列名称
+                    if data.failed_retries >= setting.SPIDER_MAX_RETRY_TIMES:
+                        state = 4  # 待采集任务停止采集状态[4=停止采集]
+
+                        '''更新失败的采集任务状态'''
+                        update_item = UpdateItem(
+                            state=state,
+                            pyuuid=data.pyuuid,
+                            update_at=update_at,
+                            failed_retries=data.failed_retries,
+                        )
+                        update_key = ['state', 'update_at', 'failed_retries']
+
+                        '''记录失败的采集任务详情'''
+                        data.state = state
+                        data.create_at = update_at
+                        failed_task_items.append(data)
+                    else:
+                        '''更新失败的采集任务状态'''
+                        update_item = UpdateItem(
+                            state=3,  # 待采集任务失败采集状态[3=采集失败]
+                            pyuuid=data.pyuuid,
+                            failed_retries=data.failed_retries,
+                        )
+                        update_key = ['state', 'failed_retries']
+
+                    update_item.update_key = update_key
+                    update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                    update_items.append(update_item)
+
+                elif isinstance(data, Item):
+                    if isinstance(data, BaseListItem):
+                        data.queue_name = self._tab_items
+                        data.update_at = update_at
+                        if hasattr(data, 'is_delay') and data.is_delay:
+                            data.state = 5  # 待采集任务延时采集状态[5=延时采集]
+                        else:
+                            data.state = 1  # 待采集任务等待采集状态[1=等待采集]
+
+                    elif isinstance(data, BaseDetailItem):
+                        if not getattr(data, 'is_mixed'):
+                            update_item = UpdateItem(
+                                state=2,  # 待采集任务成功采集状态[2=完成采集]
+                                pyuuid=data.pyuuid,
+                                update_at=update_at,
+                            )
+                            update_item.update_key = ['state', 'update_at']
+                            update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                            update_items.append(update_item)
+
+                    if data.dont_save:
+                        # 数据不入爬虫生产库(data_bak)
+                        continue
+
+                    items.append(data)
+                    if setting.ITEM_FILTER_ENABLE:
+                        items_fingerprints.append(data.fingerprint)
+
+                else:  # request-redis
+                    requests.append(data)
+
+                if data_count >= UPLOAD_BATCH_MAX_SIZE:
+                    self.__add_item_to_db(
+                        items, update_items, failed_task_items, requests, callbacks, items_fingerprints
+                    )
+
+                    items = []
+                    update_items = []
+                    failed_task_items = []
+                    requests = []
+                    callbacks = []
+                    items_fingerprints = []
+                    data_count = 0
+
+            if data_count:
+                self.__add_item_to_db(
+                    items, update_items, failed_task_items, requests, callbacks, items_fingerprints
+                )
+
+        except Exception as e:
+            log.exception(e)
+
+    def get_items_count(self):
+        return self._items_queue.qsize()
+
+    def is_adding_to_db(self):
+        return self._is_adding_to_db
+
+    def __dedup_items(self, items, items_fingerprints):
+        """
+        去重
+        @param items:
+        @param items_fingerprints:
+        @return: 返回去重后的items, items_fingerprints
+        """
+        if not items:
+            return items, items_fingerprints
+
+        is_exists = self.__class__.dedup.get(items_fingerprints)
+        is_exists = is_exists if isinstance(is_exists, list) else [is_exists]
+
+        dedup_items = []
+        dedup_items_fingerprints = []
+        items_count = dedup_items_count = dup_items_count = 0
+
+        while is_exists:
+            item = items.pop(0)
+            items_fingerprint = items_fingerprints.pop(0)
+            is_exist = is_exists.pop(0)
+
+            items_count += 1
+
+            if not is_exist:
+                dedup_items.append(item)
+                dedup_items_fingerprints.append(items_fingerprint)
+                dedup_items_count += 1
+            else:
+                dup_items_count += 1
+
+        log.info(
+            "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format(
+                items_count, dup_items_count, dedup_items_count
+            )
+        )
+
+        return dedup_items, dedup_items_fingerprints
+
+    def __pick_items(self, items, is_update_item=False):
+        """
+        将每个表之间的数据分开 拆分后 原items为空
+        @param items:
+        @param is_update_item:
+        @return:
+        """
+        datas_dict = {
+            # 'table_name': [{}, {}]
+        }
+
+        while items:
+            item = items.pop(0)
+            # 取item下划线格式的名
+            # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
+            item_name = item.item_name
+            table_name = self._item_tables.get(item_name)
+            if not table_name:
+                table_name = item.table_name
+                self._item_tables[item_name] = table_name
+
+            if table_name not in datas_dict:
+                datas_dict[table_name] = []
+
+            datas_dict[table_name].append(item.to_dict)
+
+            if is_update_item and table_name not in self._item_update_keys:
+                self._item_update_keys[table_name] = item.update_key
+
+        return datas_dict
+
+    def release_tasks(self, datas, finished=True):
+        if not self.release_task_enable:
+            return
+
+        release_tasks = []
+        if not finished:
+            token = datas["token"]
+            for item in self.tasks_dict["data"]:
+                data = datas["data"].pop(item["pyuuid"])
+                release_tasks.append(data)
+        else:
+            token = self.tasks_dict["token"]
+            target_pyuuid_lst = [data["pyuuid"] for data in datas]
+            for pyuuid in target_pyuuid_lst:
+                # if pyuuid not in self.tasks_dict["data"]:
+                #     continue
+                release_tasks.append(self.tasks_dict["data"].pop(pyuuid))
+
+        r = None
+        url = f"{setting.JY_TASK_URL}/tasks/batch-release"
+        headers = {"Authorization": token}
+        params = dict(headers=headers, timeout=10, json=release_tasks, proxies=False)
+        try:
+            r = Request(method="DELETE", url=url, **params).get_response()
+            log.debug(f"任务回传成功,~{len(release_tasks)}")
+        except Exception as e:
+            log.error(f"任务回传失败, 原因:{e}")
+        return True if r and r.status_code == 200 else False
+
+    def __export_to_db(self, table, datas, is_update=False, update_keys=()):
+        for pipeline in self._pipelines:
+            if is_update:
+                if not pipeline.update_items(table, datas, update_keys=update_keys):
+                    log.error(
+                        f"{pipeline.__class__.__name__} 更新数据失败. table: {table}  items: {datas}"
+                    )
+                    return False
+            else:
+                if not pipeline.save_items(table, datas):
+                    log.error(
+                        f"{pipeline.__class__.__name__} 保存数据失败. table: {table}  items: {datas}"
+                    )
+                    return False
+
+        self.release_tasks(datas=datas)
+        return True
+
+    def __add_item_to_db(
+        self, items, update_items, failed_task_items, requests, callbacks, items_fingerprints
+    ):
+        export_success = True
+        self._is_adding_to_db = True
+
+        if setting.ITEM_FILTER_ENABLE:
+            items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
+
+        # 分捡
+        items_dict = self.__pick_items(items)
+        update_items_dict = self.__pick_items(update_items, is_update_item=True)
+        failed_task_items_dict = self.__pick_items(failed_task_items)
+
+        # item批量入库
+        failed_items = {"add": [], "update": [], "requests": []}
+        while items_dict:
+            table, datas = items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            if not self.__export_to_db(table, datas):
+                export_success = False
+                failed_items["add"].append({"table": table, "datas": datas})
+
+        # 执行批量update
+        while update_items_dict:
+            table, datas = update_items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- item 批量更新 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            update_keys = self._item_update_keys.get(table)
+            if not self.__export_to_db(
+                table, datas, is_update=True, update_keys=update_keys
+            ):
+                export_success = False
+                failed_items["update"].append({"table": table, "datas": datas})
+
+        # 采集失败 item批量入库
+        while failed_task_items_dict:
+            table, datas = failed_task_items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- crawl failed item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            if not self.__export_to_db(table, datas):
+                export_success = False
+                failed_items["add"].append({"table": table, "datas": datas})
+
+        if export_success:
+            # 执行回调
+            while callbacks:
+                try:
+                    callback = callbacks.pop(0)
+                    callback()
+                except Exception as e:
+                    log.exception(e)
+
+            # 删除做过的request
+            if requests:
+                pass
+
+            # 去重入库
+            if setting.ITEM_FILTER_ENABLE:
+                if items_fingerprints:
+                    self.__class__.dedup.add(items_fingerprints, skip_check=True)
+        else:
+            failed_items["requests"] = requests
+
+            tip = ["入库不成功"]
+            if callbacks:
+                tip.append("不执行回调")
+            if requests:
+                tip.append("删除任务")
+
+            if setting.ITEM_FILTER_ENABLE:
+                tip.append("数据不入去重库")
+
+            tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
+            log.error(",".join(tip))
+
+            self.export_falied_times += 1
+
+            if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
+                # 报警
+                msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
+                    self._redis_key, self.export_falied_times
+                )
+                log.error(msg)
+                tools.send_msg(
+                    msg=msg,
+                    level="error",
+                    message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
+                )
+
+        self._is_adding_to_db = False
+
+    def close(self):
+        # 调用pipeline的close方法
+        for pipeline in self._pipelines:
+            try:
+                pipeline.close()
+            except:
+                pass

+ 13 - 0
FworkSpider/feapder/core/parser_control/__init__.py

@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-08-29 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+from feapder.core.parser_control.parser_control import (
+    PaserControl,
+    AirSpiderParserControl
+)
+from feapder.core.parser_control.jy_parser_control import JySpiderParserControl

+ 310 - 0
FworkSpider/feapder/core/parser_control/jy_parser_control.py

@@ -0,0 +1,310 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2017-01-03 16:06
+---------
+@summary: parser 控制类
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+import random
+import time
+from collections import Iterable
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.buffer.item_buffer import JyItemBuffer
+from feapder.core.parser_control import PaserControl
+from feapder.db.memory_db import MemoryDB
+from feapder.network.item import Item
+from feapder.network.request import Request
+from feapder.utils.log import log
+
+
+class JySpiderParserControl(PaserControl):
+    is_show_tip = False
+
+    _success_task_count = 0
+    _failed_task_count = 0
+
+    def __init__(self, memory_db: MemoryDB, item_buffer: JyItemBuffer, heartbeat_buffer):
+        super(PaserControl, self).__init__()
+        self._parsers = []
+        self._thread_stop = False
+
+        self._memory_db = memory_db
+        self._item_buffer = item_buffer
+        self._heartbeat_buffer = heartbeat_buffer
+
+    def run(self):
+        while not self._thread_stop:
+            try:
+                request = self._memory_db.get()
+                if not request:
+                    if not self.is_show_tip:
+                        log.debug("等待任务...")
+                        self.is_show_tip = True
+                    continue
+
+                self.is_show_tip = False
+                self.deal_request(request)
+
+            except (Exception, BaseException) as e:
+                log.exception(e)
+
+    def deal_request(self, request):
+        response = None
+        now_page = request.page or -1
+
+        for parser in self._parsers:
+            counter = {
+                "now_page": now_page,
+                "extract_count": 0,  # 列表页抽取的列表数量
+                "rel_count": 0,  # 去重后实际入库数量
+            }
+            if parser.name == request.parser_name:
+                try:
+                    # 解析request
+                    if request.auto_request:
+                        request_temp = None
+                        response = None
+
+                        # 下载中间件
+                        if request.download_midware:
+                            if isinstance(request.download_midware, (list, tuple)):
+                                request_temp = request
+                                for download_midware in request.download_midware:
+                                    download_midware = (
+                                        download_midware
+                                        if callable(download_midware)
+                                        else tools.get_method(
+                                            parser, download_midware
+                                        )
+                                    )
+                                    request_temp = download_midware(request_temp)
+                            else:
+                                download_midware = (
+                                    request.download_midware
+                                    if callable(request.download_midware)
+                                    else tools.get_method(
+                                        parser, request.download_midware
+                                    )
+                                )
+                                request_temp = download_midware(request)
+
+                        elif request.download_midware != False:
+                            request_temp = parser.download_midware(request)
+
+                        # 请求
+                        if request_temp:
+                            if (
+                                isinstance(request_temp, (tuple, list))
+                                and len(request_temp) == 2
+                            ):
+                                request_temp, response = request_temp
+
+                            if not isinstance(request_temp, Request):
+                                raise Exception(
+                                    "download_midware need return a request,"
+                                    "but received type: {}".format(
+                                        type(request_temp)
+                                    )
+                                )
+                            request = request_temp
+
+                        if not response:
+                            response = (
+                                request.get_response()
+                                if not setting.RESPONSE_CACHED_USED
+                                else request.get_response_from_cached(save_cached=False)
+                            )
+
+                        # 校验
+                        if parser.validate(request, response) == False:
+                            break
+
+                    else:
+                        response = None
+
+                    if request.callback:  # 如果有parser的回调函数,则用回调处理
+                        callback_parser = (
+                            request.callback
+                            if callable(request.callback)
+                            else tools.get_method(parser, request.callback)
+                        )
+                        results = callback_parser(request, response)
+                    else:  # 否则默认用parser处理
+                        results = parser.parse(request, response)
+
+                    if results and not isinstance(results, Iterable):
+                        raise Exception(
+                            "%s.%s返回值必须可迭代"
+                            % (parser.name, request.callback or "parse")
+                        )
+
+                    # 此处判断是 request 还是 item
+                    for result in results or []:
+                        if isinstance(result, Request):
+                            # 给request的 parser_name 赋值
+                            result.parser_name = result.parser_name or parser.name
+
+                            # 判断是同步的callback还是异步的
+                            if result.request_sync:  # 同步
+                                self.deal_request(result)
+                            else:  # 异步
+                                # 将next_request 入库
+                                self._memory_db.add(result)
+
+                        elif isinstance(result, Item):
+                            # 爬虫采集方式[True=混合采集(列表页+详情页); False=独立采集(列表页,详情页)]
+                            result.is_mixed = False
+                            if "List" in parser.__business_type__ and hasattr(result, "contenthtml"):
+                                result.is_mixed = True
+
+                            counter["extract_count"] += 1  # 统计抽取列表数
+                            if not self.is_duplicate(result):
+                                counter["rel_count"] += 1  # 统计实际列表数
+
+                            self._item_buffer.put_item(result)
+
+                        elif result is not None:
+                            function_name = "{}.{}".format(
+                                parser.name,
+                                (
+                                    request.callback
+                                    and callable(request.callback)
+                                    and getattr(request.callback, "__name__")
+                                    or request.callback
+                                )
+                                or "parse",
+                            )
+                            raise TypeError(
+                                f"{function_name} result expect Request or Item, bug get type: {type(result)}"
+                            )
+
+                except (Exception, BaseException) as e:
+                    exception_type = (
+                        str(type(e)).replace("<class '", "").replace("'>", "")
+                    )
+
+                    if setting.LOG_LEVEL == "DEBUG":  # 只有debug模式下打印, 超时的异常篇幅太多
+                        log.exception(e)
+
+                    log.error(
+                        """
+                            -------------- %s.%s error -------------
+                            error          %s
+                            response       %s
+                            deal request   %s
+                            """
+                        % (
+                            parser.name,
+                            (
+                                request.callback
+                                and callable(request.callback)
+                                and getattr(request.callback, "__name__")
+                                or request.callback
+                            )
+                            or "parse",
+                            str(e),
+                            response,
+                            tools.dumps_json(request.to_dict, indent=28)
+                            if setting.LOG_LEVEL == "DEBUG"
+                            else request,
+                        )
+                    )
+
+                    request.error_msg = "%s: %s" % (exception_type, e)
+                    request.response = str(response)
+
+                    if "Invalid URL" in str(e):
+                        request.is_abandoned = True
+
+                    requests = parser.exception_request(request, response) or [
+                        request
+                    ]
+                    if not isinstance(requests, Iterable):
+                        raise Exception(
+                            "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
+                        )
+                    for request in requests:
+                        if not isinstance(request, Request):
+                            raise Exception("exception_request 需 yield request")
+
+                        if (
+                            request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
+                            or request.is_abandoned
+                        ):
+                            self.__class__._failed_task_count += 1  # 记录失败任务数
+
+                            # 处理failed_request的返回值 request 或 func
+                            results = parser.failed_request(request, response) or [
+                                request
+                            ]
+                            if not isinstance(results, Iterable):
+                                raise Exception(
+                                    "%s.%s返回值必须可迭代"
+                                    % (parser.name, "failed_request")
+                                )
+
+                            log.info(
+                                """
+                                任务超过最大重试次数,丢弃
+                                url     %s
+                                重试次数 %s
+                                最大允许重试次数 %s"""
+                                % (
+                                    request.url,
+                                    request.retry_times,
+                                    setting.SPIDER_MAX_RETRY_TIMES,
+                                )
+                            )
+
+                        else:
+                            # 将 requests 重新入库 爬取
+                            request.retry_times += 1
+                            request.filter_repeat = False
+                            log.info(
+                                """
+                                    入库 等待重试
+                                    url     %s
+                                    重试次数 %s
+                                    最大允许重试次数 %s"""
+                                % (
+                                    request.url,
+                                    request.retry_times,
+                                    setting.SPIDER_MAX_RETRY_TIMES,
+                                )
+                            )
+                            self._memory_db.add(request)
+
+                else:
+                    # 记录成功任务数
+                    self.__class__._success_task_count += 1
+
+                    # 缓存下载成功的文档
+                    if setting.RESPONSE_CACHED_ENABLE:
+                        request.save_cached(
+                            response=response,
+                            expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
+                        )
+
+                finally:
+                    # 释放浏览器
+                    if response and getattr(response, "browser", None):
+                        request._webdriver_pool.put(response.browser)
+
+                    self.publish_heartbeat(parser, request, response, **counter)
+                break
+
+        if setting.SPIDER_SLEEP_TIME:
+            if (
+                isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
+                and len(setting.SPIDER_SLEEP_TIME) == 2
+            ):
+                sleep_time = random.randint(
+                    int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1])
+                )
+                time.sleep(sleep_time)
+            else:
+                time.sleep(setting.SPIDER_SLEEP_TIME)

+ 0 - 0
FworkSpider/feapder/core/parser_control.py → FworkSpider/feapder/core/parser_control/parser_control.py


+ 150 - 284
FworkSpider/feapder/core/spiders/spider.py

@@ -1,252 +1,182 @@
 # -*- coding: utf-8 -*-
 """
-Created on 2020/4/22 12:05 AM
+Created on 2024-08-19 
 ---------
-@summary:
+@summary:  
 ---------
-@author: Boris
-@email: boris_liu@foxmail.com
+@author: Dzr
 """
 
-import warnings
-from collections import Iterable
-
-import amqpstorm
+from threading import Thread
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
+from feapder.buffer.heartbeat_buffer import HeartBeatBuffer
+from feapder.buffer.item_buffer import JyItemBuffer
 from feapder.core.base_parser import BaseParser
-from feapder.core.scheduler import Scheduler
-from feapder.network.item import Item, FailedTaskItem
+from feapder.core.parser_control import JySpiderParserControl
+from feapder.db.memory_db import MemoryDB
+from feapder.network.item import FailedTaskItem
 from feapder.network.request import Request
 from feapder.utils.log import log
 
-CONSOLE_PIPELINE_PATH = "feapder.pipelines.console_pipeline.ConsolePipeline"
-
-
-class Spider(
-    BaseParser, Scheduler
-):  # threading 中有name函数, 必须先继承BaseParser 否则其内部的name会被Schedule的基类覆盖threading.Thread的name
-    """
-    @summary: 为了简化搭建爬虫
-    ---------
-    """
-
-    def __init__(
-        self,
-        redis_key=None,
-        user=None,
-        check_task_interval=5,
-        thread_count=None,
-        begin_callback=None,
-        end_callback=None,
-        keep_alive=None,
-        auto_start_requests=None,
-        **kwargs
-    ):
-        """
-        @summary: 爬虫
-        ---------
-        @param redis_key: 任务等数据存放在redis中的key前缀
-        @param user: 指定mq特定的程序消费用户标识,在多个生产者对应单一消费者时生效
-        @param check_task_interval: 检查是否还有任务的时间间隔;默认5秒
-        @param thread_count: 线程数,默认为配置文件中的线程数
-        @param begin_callback: 爬虫开始回调函数
-        @param end_callback: 爬虫结束回调函数
-        @param keep_alive: 爬虫是否常驻
-        @param auto_start_requests: 爬虫是否自动添加任务
-        ---------
-        @result:
-        """
-        super(Spider, self).__init__(
-            redis_key=redis_key,
-            user=user,
-            thread_count=thread_count,
-            begin_callback=begin_callback,
-            end_callback=end_callback,
-            keep_alive=keep_alive,
-            auto_start_requests=auto_start_requests,
-            **kwargs
-        )
-
-        self._check_task_interval = check_task_interval
-        self._is_distributed_task = False
-        self._is_show_not_task = False
-
-    def run(self):  # 调度控制流程起始
-        if not self._parsers:
-            self._parsers.append(self)
-
-        self._start()
-
-        while True:
-            try:
-                if self.all_thread_is_done():
-                    if not self._is_notify_end:
-                        self.spider_end()  # 跑完一轮
-                        self._is_notify_end = True
-
-                    if not self._keep_alive:
-                        self._stop_all_thread()
-                        break
-                else:
-                    self._is_notify_end = False
-
-                self.check_task_status()
-            except (Exception, BaseException) as e:
-                log.exception(e)
 
-            tools.delay_time(1)  # 1秒钟检查一次爬虫状态
+class Spider(BaseParser, Thread):
+    __custom_setting__ = {}
 
-    @classmethod
-    def to_DebugSpider(cls, *args, **kwargs):
-        # DebugSpider 继承 cls
-        DebugSpider.__bases__ = (cls,)
-        DebugSpider.__name__ = cls.__name__
-        return DebugSpider(*args, **kwargs)
-
-
-class DebugSpider(Spider):
-    """
-    Debug爬虫
-    """
-
-    __debug_custom_setting__ = dict(
-        COLLECTOR_SLEEP_TIME=1,
-        COLLECTOR_TASK_COUNT=1,
-        SPIDER_THREAD_COUNT=1,  # SPIDER
-        SPIDER_SLEEP_TIME=0,
-        SPIDER_TASK_COUNT=1,
-        SPIDER_MAX_RETRY_TIMES=10,
-        REQUEST_LOST_TIMEOUT=600,  # 10分钟
-        PROXY_ENABLE=False,
-        RETRY_FAILED_REQUESTS=False,
-        SAVE_FAILED_REQUEST=False,  # 保存失败的request
-        ITEM_FILTER_ENABLE=False,  # 过滤
-        REQUEST_FILTER_ENABLE=False,
-        OSS_UPLOAD_TABLES=(),
-        DELETE_KEYS=True,
-        ITEM_PIPELINES=[CONSOLE_PIPELINE_PATH],
-    )
+    __business_type__ = ""
 
-    def __init__(self, request=None, request_dict=None, *args, **kwargs):
+    def __init__(self, redis_key, thread_count=None, **kwargs):
         """
-        @param request: request 类对象
-        @param request_dict: request 字典。 request 与 request_dict 二者选一即可
-        @param kwargs:
+
+        ---------
+        @param redis_key:
+        @param thread_count: 线程数,默认为配置文件中的线程数
+        ---------
         """
-        warnings.warn(
-            "您正处于debug模式下,该模式下不会更新任务状态及数据入库,仅用于调试。正式发布前请更改为正常模式", category=Warning
-        )
+        super(Spider, self).__init__()
 
-        if not request and not request_dict:
-            raise Exception("request 与 request_dict 不能同时为null")
+        for key, value in self.__class__.__custom_setting__.items():
+            setattr(setting, key, value)
 
-        kwargs["redis_key"] = kwargs["redis_key"] + "_debug"
-        self.__class__.__custom_setting__.update(
-            self.__class__.__debug_custom_setting__
+        self._thread_count = (
+            setting.SPIDER_THREAD_COUNT if not thread_count else thread_count
         )
 
-        super(DebugSpider, self).__init__(*args, **kwargs)
-
-        self._request = request or Request.from_dict(request_dict)
-
-    def __start_requests(self):
-        yield self._request
-
-    def _start(self):
-        # 启动parser 的 start_requests
-        self.spider_begin()  # 不自动结束的爬虫此处只能执行一遍
+        self._heartbeat_buffer = HeartBeatBuffer()
+        self._item_buffer = JyItemBuffer(redis_key=redis_key)
+
+        self._memory_db = MemoryDB()
+        self._parser_controls = []  # 爬虫实例列表
+
+        self.tasks_dict = {}
+        self.task_api_auth_token = None
+
+    def distribute_task(self):
+        for request in self.start_requests():
+            if not isinstance(request, Request):
+                raise ValueError("仅支持 yield Request")
+
+            request.parser_name = request.parser_name or self.name
+            self._memory_db.add(request)
+
+    def all_thread_is_done(self):
+        for i in range(3):
+            # 检测 heartbeat_buffer 状态
+            if (
+                self._heartbeat_buffer.get_items_count() > 0
+                or self._heartbeat_buffer.is_adding_to_db()
+            ):
+                return False
+
+            # 检测 parser_control 状态
+            for parser_control in self._parser_controls:
+                if not parser_control.is_not_task():
+                    return False
+
+            # 检测 任务队列 状态
+            if not self._memory_db.empty():
+                return False
+
+            # 检测 item_buffer 状态
+            if (
+                self._item_buffer.get_items_count() > 0
+                or self._item_buffer.is_adding_to_db()
+            ):
+                return False
+
+            tools.delay_time(1)
+
+        return True
+
+    def get_task_api_token(self):
+        # 获取TOKEN
+        if self.task_api_auth_token is None:
+            token_url = f"{setting.JY_TASK_URL}/tasks/token"
+            data = {"username": "spider@py", "password": "123@qweA!"}
+            auth_params = dict(url=token_url, timeout=10, data=data, proxies=False)
+            response = Request(method="GET", **auth_params).get_response()
+            token = response.json["token"]
+            self.task_api_auth_token = token
+        log.debug(f"Apply Task api Token:{self.task_api_auth_token}")
 
-        for parser in self._parsers:
-            results = parser.__start_requests()
-            # 添加request到请求队列,由请求队列统一入库
-            if results and not isinstance(results, Iterable):
-                raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
-
-            result_type = 1
-            for result in results or []:
-                if isinstance(result, Request):
-                    result.parser_name = result.parser_name or parser.name
-                    self._request_buffer.put_request(result)
-                    result_type = 1
-
-                elif isinstance(result, Item):
-                    self._item_buffer.put_item(result)
-                    result_type = 2
-
-                elif callable(result):  # callbale的request可能是更新数据库操作的函数
-                    if result_type == 1:
-                        self._request_buffer.put_request(result)
-                    else:
-                        self._item_buffer.put_item(result)
-
-            self._request_buffer.flush()
-            self._item_buffer.flush()
+    def run(self):  # 调度控制流程起始
+        self.start_callback()
 
-        # 启动collector
-        self._collector.start()
+        self._heartbeat_buffer.start()  # 启动 heartbeat_buffer
 
-        # 启动parser control
         for i in range(self._thread_count):
-            parser_control = self._parser_control_obj(
-                self._collector,
-                self._redis_key,
-                self._request_buffer,
-                self._item_buffer,
+            parser_control = JySpiderParserControl(
+                memory_db=self._memory_db,
+                item_buffer=self._item_buffer,
+                heartbeat_buffer=self._heartbeat_buffer
             )
-
-            for parser in self._parsers:
-                parser_control.add_parser(parser)
-
+            parser_control.add_parser(self)
             parser_control.start()
             self._parser_controls.append(parser_control)
 
-        # 启动request_buffer
-        self._request_buffer.start()
-
-        # 启动item_buffer
         self._item_buffer.start()
-
-    def run(self):
-        if not self._parsers:  # 不是add_parser 模式
-            self._parsers.append(self)
-
-        self._start()
-
+        if self.__class__.__business_type__.endswith("Detail"):
+            self._item_buffer.release_task_enable = True  # 启用爬虫释放采集任务
+            self.get_task_api_token()  # 申请token
+
+        # 派发任务
+        self.distribute_task()
+        # 已派发任务加入 item_buffer 缓存容器
+        self._item_buffer.tasks_dict.update(self.tasks_dict)
         while True:
             try:
                 if self.all_thread_is_done():
-                    self._stop_all_thread()
+                    # 停止 parser_controls
+                    for parser_control in self._parser_controls:
+                        parser_control.stop()
+
+                    self._item_buffer.stop()  # 关闭 item_buffer
+                    self._heartbeat_buffer.stop()  # 关闭 heartbeat_buffer
+
+                    # 关闭 webdriver
+                    if Request.webdriver_pool:
+                        Request.webdriver_pool.close()
+
+                    log.info("无任务,爬虫结束")
                     break
+
             except Exception as e:
                 log.exception(e)
 
             tools.delay_time(1)  # 1秒钟检查一次爬虫状态
 
+        self._item_buffer.release_tasks(self.tasks_dict, finished=False)  # 释放剩余未完成的任务
+        self.end_callback()
+        self._started.clear()  # 为了线程可重复start
+
+    def join(self, timeout=None):
+        """
+        重写线程的join
+        """
+        if not self._started.is_set():
+            return
+
+        super().join()
+
 
 class BaseBusinessListSpider(Spider):
     """列表页采集基础爬虫"""
 
     __business_type__ = "List"
 
-    def __auto_increment_page_number(self, request):
-        """翻页 - 页码自增"""
-        if request.page is None:
-            raise ValueError('请设置 request.page 起始页码数')
+    def infinite_pages(self, request, response):
+        """无限翻页"""
+
+        def _page_increment():
+            if request.page is None:
+                raise ValueError("请设置 request.page 起始页码数")
 
-        if request.page < int(request.item["crawl_page"]):
-            request.page += 1  # 采集页码自增
-            yield request
+            if request.page < int(request.item["crawl_page"]):
+                request.page += 1  # 采集页码自增
+                yield request
 
-    def infinite_pages(self, request, response):
-        """翻页"""
-        generator = self.__auto_increment_page_number(request)
-        try:
-            request = next(generator)
-            return request
-        except StopIteration:
-            pass
+        return next(_page_increment(), None)
 
 
 class BaseBusinessDetailSpider(Spider):
@@ -257,32 +187,18 @@ class BaseBusinessDetailSpider(Spider):
         ITEM_FILTER_ENABLE=False
     )
 
-    def __init__(
-        self,
-        redis_key=None,
-        thread_count=None,
-        begin_callback=None,
-        end_callback=None,
-        delete_keys=(),
-        keep_alive=None,
-        auto_start_requests=None,
-        **kwargs
-    ):
+    def __init__(self, redis_key, thread_count=None, **kwargs):
         self.__class__.__custom_setting__.update(
             self.__class__.__business_setting__
         )
-        redis_key = f'{redis_key}_detailc'
         super(BaseBusinessDetailSpider, self).__init__(
             redis_key=redis_key,
             thread_count=thread_count,
-            begin_callback=begin_callback,
-            end_callback=end_callback,
-            delete_keys=delete_keys,
-            keep_alive=keep_alive,
-            auto_start_requests=auto_start_requests,
             **kwargs
         )
 
+        self._redis_key = redis_key
+
     def failed_request(self, request, response):
         """请求、解析错误次数超过上限后,记录错误详情信息"""
         failed_item = FailedTaskItem(
@@ -293,73 +209,23 @@ class BaseBusinessDetailSpider(Spider):
         failed_item.table_name = setting.TASK_REQUEST_FAILED
         yield failed_item
 
-    def get_tasks_by_rabbitmq(self, limit=None, auto_ack=True):
-        """
-
-        @param limit: 获取消息数量
-        @param auto_ack: 自动回复消息确认
-        """
-        queue_name = setting.TAB_ITEMS.format(
-            redis_key=self._redis_key.replace("_detailc", "")
-        )
-        limit = limit or setting.COLLECTOR_TASK_COUNT
-        correlation_id = tools.get_uuid().replace("-", "")
-        if self._rabbitmq.get_message_count(queue_name) == 0:
-            # 步骤1 推送要求发布任务消息
-            produce_queue = "pyspider.report.produce"
-            produce_task = {
-                "ip": tools.get_localhost_ip(),
-                "queue_name": queue_name,
-                "coll_name": setting.TASK_REQUEST_PRODUCE,
-                "limit": limit,
-            }
-            properties = dict(correlation_id=correlation_id)
-            self._rabbitmq.add(produce_task, produce_queue, properties=properties)
-
-            # 步骤2 等待任务生产完成的处理回应消息
-            receipt_queue = f"receipt_{correlation_id}"
-            with self._rabbitmq.get_mq_obj().channel() as channel:
-                try:
-                    channel.basic.consume(queue=receipt_queue, no_ack=True)
-                    tools.delay_time(0.8)  # 监听与收复消息的时间间隔
-                    inbound = channel.build_inbound_messages(break_on_empty=True)
-                    message_dict = {msg.correlation_id: msg for msg in inbound}
-                    # log.debug(f"采集任务推送 {message_dict}")
-                    message = message_dict.get(correlation_id)
-                    if message:
-                        body = tools.loads_obj(message.body)
-                        log.debug(f"推送任务到采集队列《{body['queue_name']}》完成")
-                except amqpstorm.exception.AMQPChannelError:
-                    pass
-
-        # 步骤3 开始拉取任务
-        task_lst = []
-        messages = self._rabbitmq.get(queue_name, limit, auto_ack, to_str=False)
-        for message in messages:
-            body = message.body
-            if isinstance(body, Item):
-                task_lst.append(body.to_dict)
-            else:
-                task_lst.append(body)
-        return task_lst
-
-    def get_tasks_by_mongodb(self, table=None, query=None, limit=None):
-        pipeline_path = "feapder.pipelines.mongo_pipeline.TaskPipeline"
-        pipeline = tools.import_cls(pipeline_path)()
-        table = table or setting.TASK_REQUEST_PRODUCE
-        queue_name = setting.TAB_ITEMS.format(
-            redis_key=self._redis_key.replace('_detailc', '')
-        )
-        conditions = query or {
-            'state': {'$in': [1, 3, 5]},
-            'queue_name': queue_name,
-            'update_at': {'$lt': tools.get_current_timestamp()}
+    def get_tasks(self, limit=None, **kwargs):
+        queue = setting.TAB_ITEMS.format(redis_key=self._redis_key.replace("_detailc", ""))
+
+        # 获取任务
+        url = f"{setting.JY_TASK_URL}/tasks/fd?qn={queue}&limit={limit}"
+        headers = {"Authorization": self.task_api_auth_token}
+        params = dict(headers=headers, timeout=10, proxies=False)
+        response = Request(method="GET", url=url, **params).get_response()
+        ret = response.json["task"]
+        self.tasks_dict = {
+            "token": self.task_api_auth_token,
+            "data": {t["pyuuid"]: {"tid": t["tid"], "queue": queue} for t in ret}
         }
-        limit = limit or setting.COLLECTOR_TASK_COUNT
-        results = pipeline.find_items(table, conditions, limit)
-        ignore = {'_id', 'state', 'update_at', 'queue_name'}
-        task_lst = [{k: v for k, v in items.items() if k not in ignore} for items in results]
-        return task_lst
+        return ret
+
+    get_tasks_by_rabbitmq = get_tasks
+    get_tasks_by_mongodb = get_tasks
 
 
 class BiddingListSpider(BaseBusinessListSpider):

+ 1 - 1
FworkSpider/setting.py

@@ -81,7 +81,7 @@ PROXY_EXTRACT_API = "http://proxy.spdata.jianyu360.com/proxy/getallip"
 PROXY_ENABLE = True
 JY_PROXY_URL = "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch"
 JY_PROXY_AUTHOR = "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
-JY_TASK_URL = "http://172.17.162.28:1406"
+JY_TASK_URL = "http://pytask.spdata.jianyu360.com"
 
 # item去重
 ITEM_FILTER_ENABLE = True