123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- # -*- coding: utf-8 -*-
- """
- Created on 2018-06-19 17:17
- ---------
- @summary: item 管理器, 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import importlib
- import threading
- from queue import Queue
- import feapder.setting as setting
- import feapder.utils.tools as tools
- from feapder.db.redisdb import RedisDB
- from feapder.dedup import Dedup
- from feapder.network.item import Item, UpdateItem
- from feapder.pipelines import BasePipeline
- from feapder.pipelines.mysql_pipeline import MysqlPipeline
- from feapder.utils import metrics
- from feapder.utils.log import log
- MAX_ITEM_COUNT = 5000 # 缓存中最大item数
- UPLOAD_BATCH_MAX_SIZE = 1000
- MYSQL_PIPELINE_PATH = "feapder.pipelines.mysql_pipeline.MysqlPipeline"
- class ItemBuffer(threading.Thread):
- dedup = None
- __redis_db = None
- def __init__(self, redis_key, task_table=None):
- if not hasattr(self, "_table_item"):
- super(ItemBuffer, self).__init__()
- self._thread_stop = False
- self._is_adding_to_db = False
- self._redis_key = redis_key
- self._task_table = task_table
- self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
- self._table_request = setting.TAB_REQUESTS.format(redis_key=redis_key)
- self._table_failed_items = setting.TAB_FAILED_ITEMS.format(
- redis_key=redis_key
- )
- self._item_tables = {
- # 'item_name': 'table_name' # 缓存item名与表名对应关系
- }
- self._item_update_keys = {
- # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
- }
- self._pipelines = self.load_pipelines()
- self._have_mysql_pipeline = MYSQL_PIPELINE_PATH in setting.ITEM_PIPELINES
- self._mysql_pipeline = None
- if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
- self.__class__.dedup = Dedup(
- to_md5=False, **setting.ITEM_FILTER_SETTING
- )
- # 导出重试的次数
- self.export_retry_times = 0
- # 导出失败的次数 TODO 非air爬虫使用redis统计
- self.export_falied_times = 0
- @property
- def redis_db(self):
- if self.__class__.__redis_db is None:
- self.__class__.__redis_db = RedisDB()
- return self.__class__.__redis_db
- def load_pipelines(self):
- pipelines = []
- for pipeline_path in setting.ITEM_PIPELINES:
- module, class_name = pipeline_path.rsplit(".", 1)
- pipeline_cls = importlib.import_module(module).__getattribute__(class_name)
- pipeline = pipeline_cls()
- if not isinstance(pipeline, BasePipeline):
- raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
- pipelines.append(pipeline)
- return pipelines
- @property
- def mysql_pipeline(self):
- if not self._mysql_pipeline:
- module, class_name = MYSQL_PIPELINE_PATH.rsplit(".", 1)
- pipeline_cls = importlib.import_module(module).__getattribute__(class_name)
- self._mysql_pipeline = pipeline_cls()
- return self._mysql_pipeline
- def run(self):
- self._thread_stop = False
- while not self._thread_stop:
- self.flush()
- tools.delay_time(1)
- self.close()
- def stop(self):
- self._thread_stop = True
- self._started.clear()
- def put_item(self, item):
- if isinstance(item, Item):
- # 入库前的回调
- item.pre_to_db()
- self._items_queue.put(item)
- def flush(self):
- try:
- items = []
- update_items = []
- requests = []
- callbacks = []
- items_fingerprints = []
- data_count = 0
- while not self._items_queue.empty():
- data = self._items_queue.get_nowait()
- data_count += 1
- # data 分类
- if callable(data):
- callbacks.append(data)
- elif isinstance(data, UpdateItem):
- update_items.append(data)
- elif isinstance(data, Item):
- items.append(data)
- if setting.ITEM_FILTER_ENABLE:
- items_fingerprints.append(data.fingerprint)
- else: # request-redis
- requests.append(data)
- if data_count >= UPLOAD_BATCH_MAX_SIZE:
- self.__add_item_to_db(
- items, update_items, requests, callbacks, items_fingerprints
- )
- items = []
- update_items = []
- requests = []
- callbacks = []
- items_fingerprints = []
- data_count = 0
- if data_count:
- self.__add_item_to_db(
- items, update_items, requests, callbacks, items_fingerprints
- )
- except Exception as e:
- log.exception(e)
- def get_items_count(self):
- return self._items_queue.qsize()
- def is_adding_to_db(self):
- return self._is_adding_to_db
- def __dedup_items(self, items, items_fingerprints):
- """
- 去重
- @param items:
- @param items_fingerprints:
- @return: 返回去重后的items, items_fingerprints
- """
- if not items:
- return items, items_fingerprints
- is_exists = self.__class__.dedup.get(items_fingerprints)
- is_exists = is_exists if isinstance(is_exists, list) else [is_exists]
- dedup_items = []
- dedup_items_fingerprints = []
- items_count = dedup_items_count = dup_items_count = 0
- while is_exists:
- item = items.pop(0)
- items_fingerprint = items_fingerprints.pop(0)
- is_exist = is_exists.pop(0)
- items_count += 1
- if not is_exist:
- dedup_items.append(item)
- dedup_items_fingerprints.append(items_fingerprint)
- dedup_items_count += 1
- else:
- dup_items_count += 1
- log.info(
- "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format(
- items_count, dup_items_count, dedup_items_count
- )
- )
- return dedup_items, dedup_items_fingerprints
- def __pick_items(self, items, is_update_item=False):
- """
- 将每个表之间的数据分开 拆分后 原items为空
- @param items:
- @param is_update_item:
- @return:
- """
- datas_dict = {
- # 'table_name': [{}, {}]
- }
- while items:
- item = items.pop(0)
- # 取item下划线格式的名
- # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
- item_name = item.item_name
- table_name = self._item_tables.get(item_name)
- if not table_name:
- table_name = item.table_name
- self._item_tables[item_name] = table_name
- if table_name not in datas_dict:
- datas_dict[table_name] = []
- datas_dict[table_name].append(item.to_dict)
- if is_update_item and table_name not in self._item_update_keys:
- self._item_update_keys[table_name] = item.update_key
- return datas_dict
- def __export_to_db(self, table, datas, is_update=False, update_keys=()):
- # 打点 校验
- self.check_datas(table=table, datas=datas)
- for pipeline in self._pipelines:
- if is_update:
- if table == self._task_table and not isinstance(
- pipeline, MysqlPipeline
- ):
- continue
- if not pipeline.update_items(table, datas, update_keys=update_keys):
- log.error(
- f"{pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
- )
- return False
- else:
- if not pipeline.save_items(table, datas):
- log.error(
- f"{pipeline.__class__.__name__} 保存数据失败. table: {table} items: {datas}"
- )
- return False
- # 若是任务表, 且上面的pipeline里没mysql,则需调用mysql更新任务
- if not self._have_mysql_pipeline and is_update and table == self._task_table:
- if not self.mysql_pipeline.update_items(
- table, datas, update_keys=update_keys
- ):
- log.error(
- f"{self.mysql_pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
- )
- return False
- return True
- def __add_item_to_db(
- self, items, update_items, requests, callbacks, items_fingerprints
- ):
- export_success = True
- self._is_adding_to_db = True
- # 去重
- if setting.ITEM_FILTER_ENABLE:
- items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
- # 分捡
- items_dict = self.__pick_items(items)
- update_items_dict = self.__pick_items(update_items, is_update_item=True)
- # item批量入库
- failed_items = {"add": [], "update": [], "requests": []}
- while items_dict:
- table, datas = items_dict.popitem()
- log.debug(
- """
- -------------- item 批量入库 --------------
- 表名: %s
- datas: %s
- """
- % (table, tools.dumps_json(datas, indent=16))
- )
- if not self.__export_to_db(table, datas):
- export_success = False
- failed_items["add"].append({"table": table, "datas": datas})
- # 执行批量update
- while update_items_dict:
- table, datas = update_items_dict.popitem()
- log.debug(
- """
- -------------- item 批量更新 --------------
- 表名: %s
- datas: %s
- """
- % (table, tools.dumps_json(datas, indent=16))
- )
- update_keys = self._item_update_keys.get(table)
- if not self.__export_to_db(
- table, datas, is_update=True, update_keys=update_keys
- ):
- export_success = False
- failed_items["update"].append({"table": table, "datas": datas})
- if export_success:
- # 执行回调
- while callbacks:
- try:
- callback = callbacks.pop(0)
- callback()
- except Exception as e:
- log.exception(e)
- # 删除做过的request
- if requests:
- self.redis_db.zrem(self._table_request, requests)
- # 去重入库
- if setting.ITEM_FILTER_ENABLE:
- if items_fingerprints:
- self.__class__.dedup.add(items_fingerprints, skip_check=True)
- else:
- failed_items["requests"] = requests
- if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
- if self._redis_key != "air_spider":
- # 失败的item记录到redis
- self.redis_db.sadd(self._table_failed_items, failed_items)
- # 删除做过的request
- if requests:
- self.redis_db.zrem(self._table_request, requests)
- log.error(
- "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
- tools.dumps_json(failed_items)
- )
- )
- self.export_retry_times = 0
- else:
- tip = ["入库不成功"]
- if callbacks:
- tip.append("不执行回调")
- if requests:
- tip.append("不删除任务")
- exists = self.redis_db.zexists(self._table_request, requests)
- for exist, request in zip(exists, requests):
- if exist:
- self.redis_db.zadd(self._table_request, requests, 300)
- if setting.ITEM_FILTER_ENABLE:
- tip.append("数据不入去重库")
- if self._redis_key != "air_spider":
- tip.append("将自动重试")
- tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
- log.error(",".join(tip))
- self.export_falied_times += 1
- if self._redis_key != "air_spider":
- self.export_retry_times += 1
- if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
- # 报警
- msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
- self._redis_key, self.export_falied_times
- )
- log.error(msg)
- tools.send_msg(
- msg=msg,
- level="error",
- message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
- )
- self._is_adding_to_db = False
- def check_datas(self, table, datas):
- """
- 打点 记录总条数及每个key情况
- @param table: 表名
- @param datas: 数据 列表
- @return:
- """
- metrics.emit_counter("total count", len(datas), classify=table)
- for data in datas:
- for k, v in data.items():
- metrics.emit_counter(k, int(bool(v)), classify=table)
- def close(self):
- # 调用pipeline的close方法
- for pipeline in self._pipelines:
- try:
- pipeline.close()
- except:
- pass
|