# -*- coding: utf-8 -*- """ Created on 2018-06-19 17:17 --------- @summary: item 管理器 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库 --------- @author: Boris @email: boris_liu@foxmail.com """ import threading from queue import Queue import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.rabbitMq import RabbitMQ from feapder.dedup import Dedup from feapder.network.item import ( Item, UpdateItem, BaseListItem, BaseDetailItem, FailedTaskItem, ) from feapder.pipelines import BasePipeline from feapder.utils import metrics from feapder.utils.log import log MAX_ITEM_COUNT = 5000 # 缓存中最大item数 UPLOAD_BATCH_MAX_SIZE = 1000 class ItemBuffer(threading.Thread): dedup = None def __init__(self, redis_key, rabbitmq=None): if not hasattr(self, "_items_queue"): super(ItemBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._redis_key = redis_key self._items_queue = Queue(maxsize=MAX_ITEM_COUNT) self._rabbitmq = rabbitmq or RabbitMQ() # 任务队列 self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key) self._rabbitmq.declare_bind(queue=self._tab_requests) # 数据保存失败队列 self._tab_failed_items = setting.TAB_FAILED_ITEMS self._rabbitmq.declare_bind(queue=self._tab_failed_items) # 采集任务队列(rabbitMq) self._tab_items = setting.TAB_ITEMS.format( redis_key=redis_key.replace('_detailc', '') ) self._rabbitmq.declare_bind(queue=self._tab_items) self._item_tables = { # 'item_name': 'table_name' # 缓存item名与表名对应关系 } self._item_update_keys = { # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系 } self._pipelines = self.load_pipelines() if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup: self.__class__.dedup = Dedup( to_md5=False, **setting.ITEM_FILTER_SETTING ) # 导出重试的次数 self.export_retry_times = 0 # 导出失败的次数 self.export_falied_times = 0 def load_pipelines(self): pipelines = [] for pipeline_path in setting.ITEM_PIPELINES: pipeline = tools.import_cls(pipeline_path)() if not isinstance(pipeline, BasePipeline): raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline") pipelines.append(pipeline) return pipelines def run(self): self._thread_stop = False while not self._thread_stop: self.flush() tools.delay_time(1) self.close() def stop(self): self._thread_stop = True self._started.clear() def put_item(self, item): if isinstance(item, Item): # 入库前的回调 item.pre_to_db() if item.dont_save: # 不保存数据 return self._items_queue.put(item) def flush(self): try: items = [] update_items = [] failed_task_items = [] requests = [] callbacks = [] items_fingerprints = [] data_count = 0 while not self._items_queue.empty(): data = self._items_queue.get_nowait() data_count += 1 update_at = tools.ensure_int64(tools.get_current_timestamp()) # data 分类 if callable(data): callbacks.append(data) elif isinstance(data, UpdateItem): update_items.append(data) elif isinstance(data, FailedTaskItem): data.queue_name = self._tab_items # 采集任务队列名称 failed_times = data.to_dict.pop('failed_times', 0) if failed_times >= setting.SPIDER_FAILED_TASK_MAX_RETRY_TIMES: state = 4 # 待采集任务停止采集状态[4=停止采集] # 更新完成采集的任务状态 update_item = UpdateItem( state=state, pyuuid=data.pyuuid, update_at=update_at, ) update_item.update_key = ['state', 'update_at'] update_item.table_name = setting.TASK_REQUEST_PRODUCE update_items.append(update_item) # 保存失败的采集任务详情 data.state = state data.failed_times = failed_times data.create_at = update_at failed_task_items.append(data) else: update_item = UpdateItem( state=3, # 待采集任务失败采集状态[3=采集失败] pyuuid=data.pyuuid, update_at=update_at, failed_times=failed_times + 1 ) update_item.update_key = ['state', 'update_at', 'failed_times'] update_item.table_name = setting.TASK_REQUEST_PRODUCE update_items.append(update_item) elif isinstance(data, Item): if isinstance(data, BaseListItem): data.queue_name = self._tab_items data.update_at = update_at if hasattr(data, 'is_delay') and data.is_delay: data.state = 5 # 待采集任务延时采集状态[5=延时采集] else: data.state = 1 # 待采集任务等待采集状态[1=等待采集] elif isinstance(data, BaseDetailItem): update_item = UpdateItem( state=2, # 待采集任务成功采集状态[2=完成采集] pyuuid=data.pyuuid, update_at=update_at, ) update_item.update_key = ['state', 'update_at'] update_item.table_name = setting.TASK_REQUEST_PRODUCE update_items.append(update_item) items.append(data) if setting.ITEM_FILTER_ENABLE: items_fingerprints.append(data.fingerprint) else: # request-redis requests.append(data) if data_count >= UPLOAD_BATCH_MAX_SIZE: self.__add_item_to_db( items, update_items, failed_task_items, requests, callbacks, items_fingerprints ) items = [] update_items = [] failed_task_items = [] requests = [] callbacks = [] items_fingerprints = [] data_count = 0 if data_count: self.__add_item_to_db( items, update_items, failed_task_items, requests, callbacks, items_fingerprints ) except Exception as e: log.exception(e) def get_items_count(self): return self._items_queue.qsize() def is_adding_to_db(self): return self._is_adding_to_db def __dedup_items(self, items, items_fingerprints): """ 去重 @param items: @param items_fingerprints: @return: 返回去重后的items, items_fingerprints """ if not items: return items, items_fingerprints is_exists = self.__class__.dedup.get(items_fingerprints) is_exists = is_exists if isinstance(is_exists, list) else [is_exists] dedup_items = [] dedup_items_fingerprints = [] items_count = dedup_items_count = dup_items_count = 0 while is_exists: item = items.pop(0) items_fingerprint = items_fingerprints.pop(0) is_exist = is_exists.pop(0) items_count += 1 if not is_exist: dedup_items.append(item) dedup_items_fingerprints.append(items_fingerprint) dedup_items_count += 1 else: dup_items_count += 1 log.info( "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format( items_count, dup_items_count, dedup_items_count ) ) return dedup_items, dedup_items_fingerprints def __pick_items(self, items, is_update_item=False): """ 将每个表之间的数据分开 拆分后 原items为空 @param items: @param is_update_item: @return: """ datas_dict = { # 'table_name': [{}, {}] } while items: item = items.pop(0) # 取item下划线格式的名 # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度 item_name = item.item_name table_name = self._item_tables.get(item_name) if not table_name: table_name = item.table_name self._item_tables[item_name] = table_name if table_name not in datas_dict: datas_dict[table_name] = [] datas_dict[table_name].append(item.to_dict) if is_update_item and table_name not in self._item_update_keys: self._item_update_keys[table_name] = item.update_key return datas_dict def __export_to_db(self, table, datas, is_update=False, update_keys=()): for pipeline in self._pipelines: if is_update: if not pipeline.update_items(table, datas, update_keys=update_keys): log.error( f"{pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}" ) return False else: if not pipeline.save_items(table, datas): log.error( f"{pipeline.__class__.__name__} 保存数据失败. table: {table} items: {datas}" ) return False self.metric_datas(table=table, datas=datas) return True def export_to_db(self, table, datas, **kwargs): return self.__export_to_db(table, datas, **kwargs) def __add_item_to_db( self, items, update_items, failed_task_items, requests, callbacks, items_fingerprints ): export_success = True self._is_adding_to_db = True if setting.ITEM_FILTER_ENABLE: items, items_fingerprints = self.__dedup_items(items, items_fingerprints) # 分捡 items_dict = self.__pick_items(items) update_items_dict = self.__pick_items(update_items, is_update_item=True) failed_task_items_dict = self.__pick_items(failed_task_items) # item批量入库 failed_items = {"add": [], "update": [], "requests": []} while items_dict: table, datas = items_dict.popitem() log.debug( """ -------------- item 批量入库 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) if not self.__export_to_db(table, datas): export_success = False failed_items["add"].append({"table": table, "datas": datas}) # 执行批量update while update_items_dict: table, datas = update_items_dict.popitem() log.debug( """ -------------- item 批量更新 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) update_keys = self._item_update_keys.get(table) if not self.__export_to_db( table, datas, is_update=True, update_keys=update_keys ): export_success = False failed_items["update"].append({"table": table, "datas": datas}) # 采集失败 item批量入库 while failed_task_items_dict: table, datas = failed_task_items_dict.popitem() log.debug( """ -------------- crawl failed item 批量入库 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) if not self.__export_to_db(table, datas): export_success = False failed_items["add"].append({"table": table, "datas": datas}) if export_success: # 执行回调 while callbacks: try: callback = callbacks.pop(0) callback() except Exception as e: log.exception(e) # 删除做过的request if requests: # self._rabbitmq.add(self._tab_requests, requests) pass # 去重入库 if setting.ITEM_FILTER_ENABLE: if items_fingerprints: self.__class__.dedup.add(items_fingerprints, skip_check=True) else: failed_items["requests"] = requests if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: if self._redis_key != "air_spider": failed_items[self._redis_key] = '' # 添加item失败标识 # 记录失败的item self._rabbitmq.add_batch(self._tab_failed_items, failed_items) # 删除做过的request if requests: # self.redis_db.zrem(self._table_request, requests) print(f'做过的requests数量: {len(requests)}') log.error( "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format( tools.dumps_json(failed_items) ) ) self.export_retry_times = 0 else: tip = ["入库不成功"] if callbacks: tip.append("不执行回调") if requests: tip.append("不删除任务") self._rabbitmq.add_batch(self._tab_requests, requests) if setting.ITEM_FILTER_ENABLE: tip.append("数据不入去重库") if self._redis_key != "air_spider": tip.append("将自动重试") tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) log.error(",".join(tip)) self.export_falied_times += 1 if self._redis_key != "air_spider": self.export_retry_times += 1 if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: # 报警 msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format( self._redis_key, self.export_falied_times ) log.error(msg) tools.send_msg( msg=msg, level="error", message_prefix="《%s》爬虫导出数据失败" % (self._redis_key), ) self._is_adding_to_db = False def metric_datas(self, table, datas): """ 打点 记录总条数及每个key情况 @param table: 表名 @param datas: 数据 列表 @return: """ total_count = 0 for data in datas: total_count += 1 for k, v in data.items(): metrics.emit_counter(k, int(bool(v)), classify=table) metrics.emit_counter("total count", total_count, classify=table) def close(self): # 调用pipeline的close方法 for pipeline in self._pipelines: try: pipeline.close() except: pass