Răsfoiți Sursa

分布式爬虫代码备份

dongzhaorui 11 luni în urmă
părinte
comite
fd2c79bd35
1 a modificat fișierele cu 386 adăugiri și 0 ștergeri
  1. 386 0
      FworkSpider/feapder/core/spiders/spider.py.bak

+ 386 - 0
FworkSpider/feapder/core/spiders/spider.py.bak

@@ -0,0 +1,386 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2020/4/22 12:05 AM
+---------
+@summary:
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+
+import warnings
+from collections import Iterable
+
+import amqpstorm
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.core.base_parser import BaseParser
+from feapder.core.scheduler import Scheduler
+from feapder.network.item import Item, FailedTaskItem
+from feapder.network.request import Request
+from feapder.utils.log import log
+
+CONSOLE_PIPELINE_PATH = "feapder.pipelines.console_pipeline.ConsolePipeline"
+
+
+class Spider(
+    BaseParser, Scheduler
+):  # threading 中有name函数, 必须先继承BaseParser 否则其内部的name会被Schedule的基类覆盖threading.Thread的name
+    """
+    @summary: 为了简化搭建爬虫
+    ---------
+    """
+
+    def __init__(
+        self,
+        redis_key=None,
+        user=None,
+        check_task_interval=5,
+        thread_count=None,
+        begin_callback=None,
+        end_callback=None,
+        keep_alive=None,
+        auto_start_requests=None,
+        **kwargs
+    ):
+        """
+        @summary: 爬虫
+        ---------
+        @param redis_key: 任务等数据存放在redis中的key前缀
+        @param user: 指定mq特定的程序消费用户标识,在多个生产者对应单一消费者时生效
+        @param check_task_interval: 检查是否还有任务的时间间隔;默认5秒
+        @param thread_count: 线程数,默认为配置文件中的线程数
+        @param begin_callback: 爬虫开始回调函数
+        @param end_callback: 爬虫结束回调函数
+        @param keep_alive: 爬虫是否常驻
+        @param auto_start_requests: 爬虫是否自动添加任务
+        ---------
+        @result:
+        """
+        super(Spider, self).__init__(
+            redis_key=redis_key,
+            user=user,
+            thread_count=thread_count,
+            begin_callback=begin_callback,
+            end_callback=end_callback,
+            keep_alive=keep_alive,
+            auto_start_requests=auto_start_requests,
+            **kwargs
+        )
+
+        self._check_task_interval = check_task_interval
+        self._is_distributed_task = False
+        self._is_show_not_task = False
+
+    def run(self):  # 调度控制流程起始
+        if not self._parsers:
+            self._parsers.append(self)
+
+        self._start()
+
+        while True:
+            try:
+                if self.all_thread_is_done():
+                    if not self._is_notify_end:
+                        self.spider_end()  # 跑完一轮
+                        self._is_notify_end = True
+
+                    if not self._keep_alive:
+                        self._stop_all_thread()
+                        break
+                else:
+                    self._is_notify_end = False
+
+                self.check_task_status()
+            except (Exception, BaseException) as e:
+                log.exception(e)
+
+            tools.delay_time(1)  # 1秒钟检查一次爬虫状态
+
+    @classmethod
+    def to_DebugSpider(cls, *args, **kwargs):
+        # DebugSpider 继承 cls
+        DebugSpider.__bases__ = (cls,)
+        DebugSpider.__name__ = cls.__name__
+        return DebugSpider(*args, **kwargs)
+
+
+class DebugSpider(Spider):
+    """
+    Debug爬虫
+    """
+
+    __debug_custom_setting__ = dict(
+        COLLECTOR_SLEEP_TIME=1,
+        COLLECTOR_TASK_COUNT=1,
+        SPIDER_THREAD_COUNT=1,  # SPIDER
+        SPIDER_SLEEP_TIME=0,
+        SPIDER_TASK_COUNT=1,
+        SPIDER_MAX_RETRY_TIMES=10,
+        REQUEST_LOST_TIMEOUT=600,  # 10分钟
+        PROXY_ENABLE=False,
+        RETRY_FAILED_REQUESTS=False,
+        SAVE_FAILED_REQUEST=False,  # 保存失败的request
+        ITEM_FILTER_ENABLE=False,  # 过滤
+        REQUEST_FILTER_ENABLE=False,
+        OSS_UPLOAD_TABLES=(),
+        DELETE_KEYS=True,
+        ITEM_PIPELINES=[CONSOLE_PIPELINE_PATH],
+    )
+
+    def __init__(self, request=None, request_dict=None, *args, **kwargs):
+        """
+        @param request: request 类对象
+        @param request_dict: request 字典。 request 与 request_dict 二者选一即可
+        @param kwargs:
+        """
+        warnings.warn(
+            "您正处于debug模式下,该模式下不会更新任务状态及数据入库,仅用于调试。正式发布前请更改为正常模式", category=Warning
+        )
+
+        if not request and not request_dict:
+            raise Exception("request 与 request_dict 不能同时为null")
+
+        kwargs["redis_key"] = kwargs["redis_key"] + "_debug"
+        self.__class__.__custom_setting__.update(
+            self.__class__.__debug_custom_setting__
+        )
+
+        super(DebugSpider, self).__init__(*args, **kwargs)
+
+        self._request = request or Request.from_dict(request_dict)
+
+    def __start_requests(self):
+        yield self._request
+
+    def _start(self):
+        # 启动parser 的 start_requests
+        self.spider_begin()  # 不自动结束的爬虫此处只能执行一遍
+
+        for parser in self._parsers:
+            results = parser.__start_requests()
+            # 添加request到请求队列,由请求队列统一入库
+            if results and not isinstance(results, Iterable):
+                raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
+
+            result_type = 1
+            for result in results or []:
+                if isinstance(result, Request):
+                    result.parser_name = result.parser_name or parser.name
+                    self._request_buffer.put_request(result)
+                    result_type = 1
+
+                elif isinstance(result, Item):
+                    self._item_buffer.put_item(result)
+                    result_type = 2
+
+                elif callable(result):  # callbale的request可能是更新数据库操作的函数
+                    if result_type == 1:
+                        self._request_buffer.put_request(result)
+                    else:
+                        self._item_buffer.put_item(result)
+
+            self._request_buffer.flush()
+            self._item_buffer.flush()
+
+        # 启动collector
+        self._collector.start()
+
+        # 启动parser control
+        for i in range(self._thread_count):
+            parser_control = self._parser_control_obj(
+                self._collector,
+                self._redis_key,
+                self._request_buffer,
+                self._item_buffer,
+            )
+
+            for parser in self._parsers:
+                parser_control.add_parser(parser)
+
+            parser_control.start()
+            self._parser_controls.append(parser_control)
+
+        # 启动request_buffer
+        self._request_buffer.start()
+
+        # 启动item_buffer
+        self._item_buffer.start()
+
+    def run(self):
+        if not self._parsers:  # 不是add_parser 模式
+            self._parsers.append(self)
+
+        self._start()
+
+        while True:
+            try:
+                if self.all_thread_is_done():
+                    self._stop_all_thread()
+                    break
+            except Exception as e:
+                log.exception(e)
+
+            tools.delay_time(1)  # 1秒钟检查一次爬虫状态
+
+
+class BaseBusinessListSpider(Spider):
+    """列表页采集基础爬虫"""
+
+    __business_type__ = "List"
+
+    def __auto_increment_page_number(self, request):
+        """翻页 - 页码自增"""
+        if request.page is None:
+            raise ValueError('请设置 request.page 起始页码数')
+
+        if request.page < int(request.item["crawl_page"]):
+            request.page += 1  # 采集页码自增
+            yield request
+
+    def infinite_pages(self, request, response):
+        """翻页"""
+        generator = self.__auto_increment_page_number(request)
+        try:
+            request = next(generator)
+            return request
+        except StopIteration:
+            pass
+
+
+class BaseBusinessDetailSpider(Spider):
+    """详情页采集基础爬虫"""
+
+    __business_type__ = "Detail"
+    __business_setting__ = dict(
+        ITEM_FILTER_ENABLE=False
+    )
+
+    def __init__(
+        self,
+        redis_key=None,
+        thread_count=None,
+        begin_callback=None,
+        end_callback=None,
+        delete_keys=(),
+        keep_alive=None,
+        auto_start_requests=None,
+        **kwargs
+    ):
+        self.__class__.__custom_setting__.update(
+            self.__class__.__business_setting__
+        )
+        redis_key = f'{redis_key}_detailc'
+        super(BaseBusinessDetailSpider, self).__init__(
+            redis_key=redis_key,
+            thread_count=thread_count,
+            begin_callback=begin_callback,
+            end_callback=end_callback,
+            delete_keys=delete_keys,
+            keep_alive=keep_alive,
+            auto_start_requests=auto_start_requests,
+            **kwargs
+        )
+
+    def failed_request(self, request, response):
+        """请求、解析错误次数超过上限后,记录错误详情信息"""
+        failed_item = FailedTaskItem(
+            reason=getattr(request, "error_msg", ""),
+            status_code=getattr(response, "status_code", -1),
+            **request.item,  # 请求失败的任务详情
+        )
+        failed_item.table_name = setting.TASK_REQUEST_FAILED
+        yield failed_item
+
+    def get_tasks_by_rabbitmq(self, limit=None, auto_ack=True):
+        """
+
+        @param limit: 获取消息数量
+        @param auto_ack: 自动回复消息确认
+        """
+        queue_name = setting.TAB_ITEMS.format(
+            redis_key=self._redis_key.replace("_detailc", "")
+        )
+        limit = limit or setting.COLLECTOR_TASK_COUNT
+        correlation_id = tools.get_uuid().replace("-", "")
+        if self._rabbitmq.get_message_count(queue_name) == 0:
+            # 步骤1 推送要求发布任务消息
+            produce_queue = "pyspider.report.produce"
+            produce_task = {
+                "ip": tools.get_localhost_ip(),
+                "queue_name": queue_name,
+                "coll_name": setting.TASK_REQUEST_PRODUCE,
+                "limit": limit,
+            }
+            properties = dict(correlation_id=correlation_id)
+            self._rabbitmq.add(produce_task, produce_queue, properties=properties)
+
+            # 步骤2 等待任务生产完成的处理回应消息
+            receipt_queue = f"receipt_{correlation_id}"
+            with self._rabbitmq.get_mq_obj().channel() as channel:
+                try:
+                    channel.basic.consume(queue=receipt_queue, no_ack=True)
+                    tools.delay_time(0.8)  # 监听与收复消息的时间间隔
+                    inbound = channel.build_inbound_messages(break_on_empty=True)
+                    message_dict = {msg.correlation_id: msg for msg in inbound}
+                    # log.debug(f"采集任务推送 {message_dict}")
+                    message = message_dict.get(correlation_id)
+                    if message:
+                        body = tools.loads_obj(message.body)
+                        log.debug(f"推送任务到采集队列《{body['queue_name']}》完成")
+                except amqpstorm.exception.AMQPChannelError:
+                    pass
+
+        # 步骤3 开始拉取任务
+        task_lst = []
+        messages = self._rabbitmq.get(queue_name, limit, auto_ack, to_str=False)
+        for message in messages:
+            body = message.body
+            if isinstance(body, Item):
+                task_lst.append(body.to_dict)
+            else:
+                task_lst.append(body)
+        return task_lst
+
+    def get_tasks_by_mongodb(self, table=None, query=None, limit=None):
+        pipeline_path = "feapder.pipelines.mongo_pipeline.TaskPipeline"
+        pipeline = tools.import_cls(pipeline_path)()
+        table = table or setting.TASK_REQUEST_PRODUCE
+        queue_name = setting.TAB_ITEMS.format(
+            redis_key=self._redis_key.replace('_detailc', '')
+        )
+        conditions = query or {
+            'state': {'$in': [1, 3, 5]},
+            'queue_name': queue_name,
+            'update_at': {'$lt': tools.get_current_timestamp()}
+        }
+        limit = limit or setting.COLLECTOR_TASK_COUNT
+        results = pipeline.find_items(table, conditions, limit)
+        ignore = {'_id', 'state', 'update_at', 'queue_name'}
+        task_lst = [{k: v for k, v in items.items() if k not in ignore} for items in results]
+        return task_lst
+
+
+class BiddingListSpider(BaseBusinessListSpider):
+    """标讯列表页采集爬虫"""
+
+    __business_type__ = "BiddingList"
+
+
+class BiddingDetailSpider(BaseBusinessDetailSpider):
+    """标讯详情页采集爬虫"""
+
+    __business_type__ = "BiddingDetail"
+
+
+class PlanToBuildListSpider(BaseBusinessListSpider):
+    """拟建列表页采集爬虫"""
+
+    __business_type__ = "PlanToBuildList"
+
+
+class PlanToBuildDetailSpider(BaseBusinessDetailSpider):
+    """拟建详情页采集爬虫"""
+
+    __business_type__ = "PlanToBuildDetail"