# -*- coding: utf-8 -*- """ Created on 2018-06-19 17:17 --------- @summary: item 管理器, 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库 --------- @author: Boris @email: boris_liu@foxmail.com """ import threading from queue import Queue import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.rabbitMq import RabbitMQ from feapder.dedup import Dedup from feapder.network.item import Item, UpdateItem from feapder.pipelines import BasePipeline from feapder.utils import metrics from feapder.utils.log import log MAX_ITEM_COUNT = 5000 # 缓存中最大item数 UPLOAD_BATCH_MAX_SIZE = 1000 class ItemBuffer(threading.Thread): dedup = None def __init__(self, redis_key, rabbitmq=None): if not hasattr(self, "_table_item"): super(ItemBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._redis_key = redis_key self._items_queue = Queue(maxsize=MAX_ITEM_COUNT) self._rabbitmq = rabbitmq or RabbitMQ() # 任务队列 self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key) self._rabbitmq.declare(queue=self._tab_requests) # 数据保存失败队列 self._tab_failed_items = setting.TAB_FAILED_ITEMS.format(redis_key=redis_key) self._rabbitmq.declare(queue=self._tab_failed_items) self._item_tables = { # 'item_name': 'table_name' # 缓存item名与表名对应关系 } self._item_update_keys = { # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系 } self._pipelines = self.load_pipelines() if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup: self.__class__.dedup = Dedup( to_md5=False, **setting.ITEM_FILTER_SETTING ) # 导出重试的次数 self.export_retry_times = 0 # 导出失败的次数 TODO 非air爬虫使用redis统计 self.export_falied_times = 0 def load_pipelines(self): pipelines = [] for pipeline_path in setting.ITEM_PIPELINES: pipeline = tools.import_cls(pipeline_path)() if not isinstance(pipeline, BasePipeline): raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline") pipelines.append(pipeline) return pipelines def run(self): self._thread_stop = False while not self._thread_stop: self.flush() tools.delay_time(1) self.close() def stop(self): self._rabbitmq.close() self._thread_stop = True self._started.clear() def put_item(self, item): if isinstance(item, Item): # 入库前的回调 item.pre_to_db() if getattr(item, "save", True): # save=False 不推送入库 self._items_queue.put(item) def flush(self): try: items = [] update_items = [] requests = [] callbacks = [] items_fingerprints = [] data_count = 0 while not self._items_queue.empty(): data = self._items_queue.get_nowait() data_count += 1 # data 分类 if callable(data): callbacks.append(data) elif isinstance(data, UpdateItem): update_items.append(data) elif isinstance(data, Item): items.append(data) if setting.ITEM_FILTER_ENABLE: items_fingerprints.append(data.fingerprint) else: # request-redis requests.append(data) if data_count >= UPLOAD_BATCH_MAX_SIZE: self.__add_item_to_db( items, update_items, requests, callbacks, items_fingerprints ) items = [] update_items = [] requests = [] callbacks = [] items_fingerprints = [] data_count = 0 if data_count: self.__add_item_to_db( items, update_items, requests, callbacks, items_fingerprints ) except Exception as e: log.exception(e) def get_items_count(self): return self._items_queue.qsize() def is_adding_to_db(self): return self._is_adding_to_db def __dedup_items(self, items, items_fingerprints): """ 去重 @param items: @param items_fingerprints: @return: 返回去重后的items, items_fingerprints """ if not items: return items, items_fingerprints is_exists = self.__class__.dedup.get(items_fingerprints) is_exists = is_exists if isinstance(is_exists, list) else [is_exists] dedup_items = [] dedup_items_fingerprints = [] items_count = dedup_items_count = dup_items_count = 0 while is_exists: item = items.pop(0) items_fingerprint = items_fingerprints.pop(0) is_exist = is_exists.pop(0) items_count += 1 if not is_exist: dedup_items.append(item) dedup_items_fingerprints.append(items_fingerprint) dedup_items_count += 1 else: dup_items_count += 1 log.info( "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format( items_count, dup_items_count, dedup_items_count ) ) return dedup_items, dedup_items_fingerprints def __pick_items(self, items, is_update_item=False): """ 将每个表之间的数据分开 拆分后 原items为空 @param items: @param is_update_item: @return: """ datas_dict = { # 'table_name': [{}, {}] } while items: item = items.pop(0) # 取item下划线格式的名 # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度 item_name = item.item_name table_name = self._item_tables.get(item_name) if not table_name: table_name = item.table_name self._item_tables[item_name] = table_name if table_name not in datas_dict: datas_dict[table_name] = [] datas_dict[table_name].append(item.to_dict) if is_update_item and table_name not in self._item_update_keys: self._item_update_keys[table_name] = item.update_key return datas_dict def __export_to_db(self, table, datas, is_update=False, update_keys=()): for pipeline in self._pipelines: if is_update: if not pipeline.update_items(table, datas, update_keys=update_keys): log.error( f"{pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}" ) return False else: if not pipeline.save_items(table, datas): log.error( f"{pipeline.__class__.__name__} 保存数据失败. table: {table} items: {datas}" ) return False self.metric_datas(table=table, datas=datas) return True def export_to_db(self, table, datas, **kwargs): return self.__export_to_db(table, datas, **kwargs) def __add_item_to_db( self, items, update_items, requests, callbacks, items_fingerprints ): export_success = True self._is_adding_to_db = True if setting.ITEM_FILTER_ENABLE: items, items_fingerprints = self.__dedup_items(items, items_fingerprints) # 分捡 items_dict = self.__pick_items(items) update_items_dict = self.__pick_items(update_items, is_update_item=True) # item批量入库 failed_items = {"add": [], "update": [], "requests": []} while items_dict: table, datas = items_dict.popitem() log.debug( """ -------------- item 批量入库 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) if not self.__export_to_db(table, datas): export_success = False failed_items["add"].append({"table": table, "datas": datas}) # 执行批量update while update_items_dict: table, datas = update_items_dict.popitem() log.debug( """ -------------- item 批量更新 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) update_keys = self._item_update_keys.get(table) if not self.__export_to_db( table, datas, is_update=True, update_keys=update_keys ): export_success = False failed_items["update"].append({"table": table, "datas": datas}) if export_success: # 执行回调 while callbacks: try: callback = callbacks.pop(0) callback() except Exception as e: log.exception(e) # 删除做过的request if requests: # self._rabbitmq.add(self._tab_requests, requests) pass # 去重入库 if setting.ITEM_FILTER_ENABLE: if items_fingerprints: self.__class__.dedup.add(items_fingerprints, skip_check=True) else: failed_items["requests"] = requests if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: if self._redis_key != "air_spider": # 失败的item记录到rabbitmq self._rabbitmq.add(self._tab_failed_items, failed_items) # 删除做过的request if requests: # self.redis_db.zrem(self._table_request, requests) print(f'做过的requests数量: {len(requests)}') log.error( "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format( tools.dumps_json(failed_items) ) ) self.export_retry_times = 0 else: tip = ["入库不成功"] if callbacks: tip.append("不执行回调") if requests: tip.append("不删除任务") self._rabbitmq.add(self._tab_requests, requests) if setting.ITEM_FILTER_ENABLE: tip.append("数据不入去重库") if self._redis_key != "air_spider": tip.append("将自动重试") tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) log.error(",".join(tip)) self.export_falied_times += 1 if self._redis_key != "air_spider": self.export_retry_times += 1 if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: # 报警 msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format( self._redis_key, self.export_falied_times ) log.error(msg) tools.send_msg( msg=msg, level="error", message_prefix="《%s》爬虫导出数据失败" % (self._redis_key), ) self._is_adding_to_db = False def metric_datas(self, table, datas): """ 打点 记录总条数及每个key情况 @param table: 表名 @param datas: 数据 列表 @return: """ total_count = 0 for data in datas: total_count += 1 for k, v in data.items(): metrics.emit_counter(k, int(bool(v)), classify=table) metrics.emit_counter("total count", total_count, classify=table) def close(self): # 调用pipeline的close方法 for pipeline in self._pipelines: try: pipeline.close() except: pass