Ver Fonte

数据通道缓存代码备份

dongzhaorui há 11 meses atrás
pai
commit
189538cda0
1 ficheiros alterados com 474 adições e 0 exclusões
  1. 474 0
      FworkSpider/feapder/buffer/item_buffer.py.bak

+ 474 - 0
FworkSpider/feapder/buffer/item_buffer.py.bak

@@ -0,0 +1,474 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2018-06-19 17:17
+---------
+@summary: item 管理器 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+
+import threading
+from queue import Queue
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.db.rabbitMq import RabbitMQ
+from feapder.dedup import Dedup
+from feapder.network.item import (
+    Item,
+    UpdateItem,
+    BaseListItem,
+    BaseDetailItem,
+    FailedTaskItem,
+)
+from feapder.pipelines import BasePipeline
+from feapder.utils import metrics
+from feapder.utils.log import log
+
+MAX_ITEM_COUNT = 5000  # 缓存中最大item数
+UPLOAD_BATCH_MAX_SIZE = 1000
+
+
+class ItemBuffer(threading.Thread):
+    dedup = None
+
+    def __init__(self, redis_key, rabbitmq=None, user=None):
+        if not hasattr(self, "_items_queue"):
+            super(ItemBuffer, self).__init__()
+
+            self._thread_stop = False
+            self._is_adding_to_db = False
+            self._redis_key = redis_key
+            self._user = user
+
+            self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
+
+            self._rabbitmq = rabbitmq or RabbitMQ()
+
+            # 任务队列
+            self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
+            self._rabbitmq.declare_bind(queue=self._tab_requests)
+
+            # 数据保存失败队列
+            self._tab_failed_items = setting.TAB_FAILED_ITEMS
+            self._rabbitmq.declare_bind(queue=self._tab_failed_items)
+
+            # 采集任务队列(rabbitMq)
+            self._tab_items = setting.TAB_ITEMS.format(
+                redis_key=redis_key.replace('_detailc', '')
+            )
+            self._rabbitmq.declare_bind(queue=self._tab_items)
+
+            self._item_tables = {
+                # 'item_name': 'table_name' # 缓存item名与表名对应关系
+            }
+
+            self._item_update_keys = {
+                # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
+            }
+
+            self._pipelines = self.load_pipelines()
+            if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
+                self.__class__.dedup = Dedup(
+                    to_md5=False, **setting.ITEM_FILTER_SETTING
+                )
+
+            # 导出重试的次数
+            self.export_retry_times = 0
+            # 导出失败的次数
+            self.export_falied_times = 0
+
+    def load_pipelines(self):
+        pipelines = []
+        for pipeline_path in setting.ITEM_PIPELINES:
+            pipeline = tools.import_cls(pipeline_path)()
+            if not isinstance(pipeline, BasePipeline):
+                raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
+            pipelines.append(pipeline)
+
+        return pipelines
+
+    def run(self):
+        self._thread_stop = False
+        while not self._thread_stop:
+            self.flush()
+            tools.delay_time(1)
+
+        self.close()
+
+    def stop(self):
+        self._thread_stop = True
+        self._started.clear()
+
+    def put_item(self, item):
+        if isinstance(item, Item):
+            # 入库前的回调
+            item.pre_to_db()
+
+        self._items_queue.put(item)
+
+    def flush(self):
+        try:
+            items = []
+            update_items = []
+            failed_task_items = []
+            requests = []
+            callbacks = []
+            items_fingerprints = []
+            data_count = 0
+
+            while not self._items_queue.empty():
+                data = self._items_queue.get_nowait()
+                data_count += 1
+                update_at = tools.ensure_int64(tools.get_current_timestamp())
+
+                # data 分类
+                if callable(data):
+                    callbacks.append(data)
+
+                elif isinstance(data, UpdateItem):
+                    update_items.append(data)
+
+                elif isinstance(data, FailedTaskItem):
+                    data.queue_name = self._tab_items  # 采集任务队列名称
+                    if data.failed_retries >= setting.SPIDER_MAX_RETRY_TIMES:
+                        state = 4  # 待采集任务停止采集状态[4=停止采集]
+
+                        '''更新失败的采集任务状态'''
+                        update_item = UpdateItem(
+                            state=state,
+                            pyuuid=data.pyuuid,
+                            update_at=update_at,
+                            failed_retries=data.failed_retries,
+                        )
+                        update_key = ['state', 'update_at', 'failed_retries']
+
+                        '''记录失败的采集任务详情'''
+                        data.state = state
+                        data.create_at = update_at
+                        failed_task_items.append(data)
+                    else:
+                        '''更新失败的采集任务状态'''
+                        update_item = UpdateItem(
+                            state=3,  # 待采集任务失败采集状态[3=采集失败]
+                            pyuuid=data.pyuuid,
+                            failed_retries=data.failed_retries,
+                        )
+                        update_key = ['state', 'failed_retries']
+
+                    update_item.update_key = update_key
+                    update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                    update_items.append(update_item)
+
+                elif isinstance(data, Item):
+                    if isinstance(data, BaseListItem):
+                        data.queue_name = self._tab_items
+                        data.update_at = update_at
+                        if hasattr(data, 'is_delay') and data.is_delay:
+                            data.state = 5  # 待采集任务延时采集状态[5=延时采集]
+                        else:
+                            data.state = 1  # 待采集任务等待采集状态[1=等待采集]
+
+                    elif isinstance(data, BaseDetailItem):
+                        if not getattr(data, 'is_mixed'):
+                            update_item = UpdateItem(
+                                state=2,  # 待采集任务成功采集状态[2=完成采集]
+                                pyuuid=data.pyuuid,
+                                update_at=update_at,
+                            )
+                            update_item.update_key = ['state', 'update_at']
+                            update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                            update_items.append(update_item)
+
+                    if data.dont_save:
+                        # 数据不写入爬虫生产库(data_bak)
+                        continue
+
+                    items.append(data)
+                    if setting.ITEM_FILTER_ENABLE:
+                        items_fingerprints.append(data.fingerprint)
+
+                else:  # request-redis
+                    requests.append(data)
+
+                if data_count >= UPLOAD_BATCH_MAX_SIZE:
+                    self.__add_item_to_db(
+                        items, update_items, failed_task_items, requests, callbacks, items_fingerprints
+                    )
+
+                    items = []
+                    update_items = []
+                    failed_task_items = []
+                    requests = []
+                    callbacks = []
+                    items_fingerprints = []
+                    data_count = 0
+
+            if data_count:
+                self.__add_item_to_db(
+                    items, update_items, failed_task_items, requests, callbacks, items_fingerprints
+                )
+
+        except Exception as e:
+            log.exception(e)
+
+    def get_items_count(self):
+        return self._items_queue.qsize()
+
+    def is_adding_to_db(self):
+        return self._is_adding_to_db
+
+    def __dedup_items(self, items, items_fingerprints):
+        """
+        去重
+        @param items:
+        @param items_fingerprints:
+        @return: 返回去重后的items, items_fingerprints
+        """
+        if not items:
+            return items, items_fingerprints
+
+        is_exists = self.__class__.dedup.get(items_fingerprints)
+        is_exists = is_exists if isinstance(is_exists, list) else [is_exists]
+
+        dedup_items = []
+        dedup_items_fingerprints = []
+        items_count = dedup_items_count = dup_items_count = 0
+
+        while is_exists:
+            item = items.pop(0)
+            items_fingerprint = items_fingerprints.pop(0)
+            is_exist = is_exists.pop(0)
+
+            items_count += 1
+
+            if not is_exist:
+                dedup_items.append(item)
+                dedup_items_fingerprints.append(items_fingerprint)
+                dedup_items_count += 1
+            else:
+                dup_items_count += 1
+
+        log.info(
+            "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format(
+                items_count, dup_items_count, dedup_items_count
+            )
+        )
+
+        return dedup_items, dedup_items_fingerprints
+
+    def __pick_items(self, items, is_update_item=False):
+        """
+        将每个表之间的数据分开 拆分后 原items为空
+        @param items:
+        @param is_update_item:
+        @return:
+        """
+        datas_dict = {
+            # 'table_name': [{}, {}]
+        }
+
+        while items:
+            item = items.pop(0)
+            # 取item下划线格式的名
+            # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
+            item_name = item.item_name
+            table_name = self._item_tables.get(item_name)
+            if not table_name:
+                table_name = item.table_name
+                self._item_tables[item_name] = table_name
+
+            if table_name not in datas_dict:
+                datas_dict[table_name] = []
+
+            datas_dict[table_name].append(item.to_dict)
+
+            if is_update_item and table_name not in self._item_update_keys:
+                self._item_update_keys[table_name] = item.update_key
+
+        return datas_dict
+
+    def __export_to_db(self, table, datas, is_update=False, update_keys=()):
+        for pipeline in self._pipelines:
+            if is_update:
+                if not pipeline.update_items(table, datas, update_keys=update_keys):
+                    log.error(
+                        f"{pipeline.__class__.__name__} 更新数据失败. table: {table}  items: {datas}"
+                    )
+                    return False
+            else:
+                if not pipeline.save_items(table, datas):
+                    log.error(
+                        f"{pipeline.__class__.__name__} 保存数据失败. table: {table}  items: {datas}"
+                    )
+                    return False
+
+        self.metric_datas(table=table, datas=datas)
+        return True
+
+    def __add_item_to_db(
+        self, items, update_items, failed_task_items, requests, callbacks, items_fingerprints
+    ):
+        export_success = True
+        self._is_adding_to_db = True
+
+        if setting.ITEM_FILTER_ENABLE:
+            items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
+
+        # 分捡
+        items_dict = self.__pick_items(items)
+        update_items_dict = self.__pick_items(update_items, is_update_item=True)
+        failed_task_items_dict = self.__pick_items(failed_task_items)
+
+        # item批量入库
+        failed_items = {"add": [], "update": [], "requests": []}
+        while items_dict:
+            table, datas = items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            if not self.__export_to_db(table, datas):
+                export_success = False
+                failed_items["add"].append({"table": table, "datas": datas})
+
+        # 执行批量update
+        while update_items_dict:
+            table, datas = update_items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- item 批量更新 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            update_keys = self._item_update_keys.get(table)
+            if not self.__export_to_db(
+                table, datas, is_update=True, update_keys=update_keys
+            ):
+                export_success = False
+                failed_items["update"].append({"table": table, "datas": datas})
+
+        # 采集失败 item批量入库
+        while failed_task_items_dict:
+            table, datas = failed_task_items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- crawl failed item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            if not self.__export_to_db(table, datas):
+                export_success = False
+                failed_items["add"].append({"table": table, "datas": datas})
+
+        if export_success:
+            # 执行回调
+            while callbacks:
+                try:
+                    callback = callbacks.pop(0)
+                    callback()
+                except Exception as e:
+                    log.exception(e)
+
+            # 删除做过的request
+            if requests:
+                # self._rabbitmq.add(self._tab_requests, requests)
+                pass
+
+            # 去重入库
+            if setting.ITEM_FILTER_ENABLE:
+                if items_fingerprints:
+                    self.__class__.dedup.add(items_fingerprints, skip_check=True)
+        else:
+            failed_items["requests"] = requests
+            # 设置mq访问者的唯一标识特性 correlation_id
+            properties = dict(correlation_id=self._user or self._redis_key)
+
+            if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
+                if self._redis_key != "air_spider":
+                    # 记录失败的item
+                    self._rabbitmq.add_batch(self._tab_failed_items, failed_items, properties=properties)
+                    # 删除做过的request
+                    if requests:
+                        # self.redis_db.zrem(self._table_request, requests)
+                        print(f'做过的requests数量: {len(requests)}')
+
+                    log.error(
+                        "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
+                            tools.dumps_json(failed_items)
+                        )
+                    )
+                self.export_retry_times = 0
+
+            else:
+                tip = ["入库不成功"]
+                if callbacks:
+                    tip.append("不执行回调")
+                if requests:
+                    tip.append("不删除任务")
+                    self._rabbitmq.add_batch(self._tab_requests, requests, properties=properties)
+
+                if setting.ITEM_FILTER_ENABLE:
+                    tip.append("数据不入去重库")
+
+                if self._redis_key != "air_spider":
+                    tip.append("将自动重试")
+
+                tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
+                log.error(",".join(tip))
+
+                self.export_falied_times += 1
+
+                if self._redis_key != "air_spider":
+                    self.export_retry_times += 1
+
+            if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
+                # 报警
+                msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
+                    self._redis_key, self.export_falied_times
+                )
+                log.error(msg)
+                tools.send_msg(
+                    msg=msg,
+                    level="error",
+                    message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
+                )
+
+        self._is_adding_to_db = False
+
+    def metric_datas(self, table, datas):
+        """
+        打点 记录总条数及每个key情况
+        @param table: 表名
+        @param datas: 数据 列表
+        @return:
+        """
+        total_count = 0
+        for data in datas:
+            total_count += 1
+            for k, v in data.items():
+                metrics.emit_counter(k, int(bool(v)), classify=table)
+        metrics.emit_counter("total count", total_count, classify=table)
+
+    def close(self):
+        # 调用pipeline的close方法
+        for pipeline in self._pipelines:
+            try:
+                pipeline.close()
+            except:
+                pass