# -*- coding: utf-8 -*- """ Created on 2023-04-05 --------- @summary: 爬虫监控数据汇总 --------- @author: dzr """ import hashlib from datetime import datetime, time, timedelta from bson.int64 import Int64 from bson.son import SON from pymongo import MongoClient from log import logger from operator import itemgetter # mongo # MONGO_HOST = "172.17.4.87" # MONGO_PORT = 27080 MONGO_HOST = "127.0.0.1" MONGO_PORT = 27001 client = MongoClient(MONGO_HOST, MONGO_PORT) MONGO_DB1 = "py_spider" MONGO_DB2 = "editor" mongodb1 = client[MONGO_DB1] mongodb2 = client[MONGO_DB2] # 爬虫数据表 data_bak = mongodb1["data_bak"] # 心跳表 spider_heartbeat = mongodb1["spider_heartbeat"] # py_spiders列表 py_spiders_crawl_list = mongodb1["crawl_data"] # 列表页汇总表 spider_monitor = mongodb1["spider_monitor"] # luaconfig表 spider_lua_config = mongodb2["luaconfig"] # 特殊网站 special_sites = ["云南省政府采购网", "湖南省政府采购电子卖场"] def get_md5(*args, **kwargs): """ @summary: 获取唯一的32位md5 --------- @param args: 参与联合去重的值数组 @param kwargs: 参与联合去重的值字典 --------- @result: 7c8684bcbdfcea6697650aa53d7b1405 """ conditions = ["site", "channel", "spidercode"] data_lst = list(filter(lambda x: x is not None, args)) for k, v in kwargs.items(): if k in conditions and (v and v not in data_lst): data_lst.append(v) if not data_lst or len(data_lst) != 3: # raise AttributeError(f"缺少{conditions}属性") logger.error(f"[Monitor]缺少{conditions}属性,内容:{data_lst}") return None data_lst = sorted(data_lst) content = "_".join(data_lst) return hashlib.md5(str(content).encode()).hexdigest() def get_runtime(datestr=None): if datestr is None: today = datetime.now().date() yesterday = today + timedelta(days=-1) datestr = yesterday.strftime("%Y-%m-%d") return datestr def save(documents, collection): """保存数据""" is_list = isinstance(documents, list) documents = documents if is_list else [documents] count = 0 data_lst = [] for items in documents: items.pop("_id", None) items.pop("business_type", None) items["comeintime"] = Int64(datetime.now().timestamp()) data_lst.append(items) count += 1 if len(data_lst) % 100 == 0: collection.insert_many(data_lst) data_lst.clear() logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存") # 提交剩余数据 collection.insert_many(data_lst) logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存") def get_crawler_basic_information(): """爬虫基础信息""" crawler_lst = [] q = {"platform": "python", "state": 11} projection = { "_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code": 1, "event": 1 } cursor = spider_lua_config.find(q, projection=projection) try: for doc in cursor: crawler_lst.append({ "site": doc["site"], "channel": doc["channel"], "spidercode": doc["code"], "modifyid": doc["modifyuserid"], "modifyuser": doc["modifyuser"], "event": doc["event"] }) finally: client.close() logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫") yield from crawler_lst def aggregate_query(collection, pipeline, is_print_error=False): """ 聚合查询 @param collection: MongoDB集合 @param pipeline: 聚合查询条件 @param is_print_error: 是否在console打印错误日志 @return: 聚合结果 """ results = [] cursor = collection.aggregate(pipeline, allowDiskUse=True) try: for doc in cursor: results.append(doc) except Exception as e: if is_print_error: logger.exception(e) finally: client.close() yield from results def aggregate_query_crawl_count(runtime): """ feapder爬虫列表和详情采集聚合查询结果统计 @param runtime: 运行时间 @return: """ aggregate_items = {} pipeline = [ {"$match": {"runtime": runtime}}, { "$group": { "_id": "$spider_id", "rel_count": {"$sum": "$rel_count"}, # 入库量 "count": {"$sum": "$count"}, # 下载量 "spider_item": { "$addToSet": { "site": "$site", "channel": "$channel", "spidercode": "$spidercode", "business_type": "$business_type" } } } }, {"$sort": SON([("rel_count", -1)])} ] results = aggregate_query(spider_heartbeat, pipeline) for doc in results: spider_item = doc["spider_item"] label_dict = {} for items in spider_item: site = items["site"] channel = items["channel"] spidercode = items["spidercode"] business_type = items["business_type"] if len(spider_item) > 1 and site not in special_sites: logger.warning(f"[Monitor]{spidercode} -> {site}--存在风险, {len(spider_item)}") is_list = str(business_type).endswith("List") hash_key = get_md5(**items) # 防止多站点对应1个spidercode,数据相互重叠 if not hash_key: # logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常") logger.error(f"[Monitor]{doc['_id']}--爬虫异常") continue if not aggregate_items.get(hash_key): data = { "spider_id": doc["_id"], "site": site, "channel": items["channel"], "spidercode": spidercode, "business_type": business_type, "runtime": runtime, "spidercode_at_site_num": 0 # 爬虫代码与站点对应的关系数量 } if is_list: data["list_count"] = doc["count"] data["list_rel_count"] = doc["rel_count"] data["list_runtimes"] = 1 data["detail_count"] = 0 data["detail_rel_count"] = 0 data["detail_runtimes"] = 0 else: data["detail_count"] = doc["count"] data["detail_rel_count"] = doc["rel_count"] data["detail_runtimes"] = 1 data["list_count"] = 0 data["list_rel_count"] = 0 data["list_runtimes"] = 0 aggregate_items.setdefault(hash_key, data) else: data = aggregate_items.get(hash_key) if is_list: data["list_count"] += doc["count"] data["list_rel_count"] += doc["rel_count"] data["list_runtimes"] += 1 else: data["detail_count"] += doc["count"] data["detail_rel_count"] += doc["rel_count"] data["detail_runtimes"] += 1 aggregate_items.update({hash_key: data}) # 监控爬虫任务,当 spidercode_at_site_num > 1 # 表明创建的爬虫任务存在问题,问题反馈数据寻源人员 if site not in special_sites: label = f"{business_type}_{spidercode}" if label not in label_dict: aggregate_items[hash_key]["spidercode_at_site_num"] = 1 conditions = {"keys": [hash_key], "websites": [site]} label_dict.setdefault(label, conditions) else: # 相同spidercode但site不同的爬虫进行计数+1 websites = label_dict[label]["websites"] if site not in websites: keys = label_dict[label]["keys"] for key in keys: aggregate_items[key]["spidercode_at_site_num"] += 1 # 记录身份id - hash_key keys.append(hash_key) # 记录站点 websites.append(site) aggregate_items[hash_key]["spidercode_at_site_num"] = len(websites) return aggregate_items def aggregate_query_crawl_list(runtime): """feapder列表爬虫采集聚合查询结果统计""" aggregate_items = {} pipeline = [ {"$match": {"runtime": runtime, "business_type": {"$regex": "List"}}}, { "$group": { "_id": "$spider_id", "spider_item": { "$addToSet": { "site": "$site", "channel": "$channel", "spidercode": "$spidercode", "count": {"$ifNull": ["$count", 0]}, "rel_count": {"$ifNull": ["$rel_count", 0]}, } } } } ] results = aggregate_query(spider_heartbeat, pipeline) for doc in results: spider_item = doc["spider_item"] for item in spider_item: hask_key = get_md5(**item) if not aggregate_items.get(hask_key): if all([ item["count"] > 0, item["rel_count"] > 0, item["count"] == item["rel_count"] ]): aggregate_items.setdefault(hask_key, {"list_allintimes": 1}) else: aggregate_items.get(hask_key)["list_allintimes"] += 1 return aggregate_items def aggregate_query_crawlab_info(runtime): """feapder爬虫采集聚合查询crawlab平台运行信息统计""" aggregate_items = {} pipeline = [ { "$project": { "_id": 0, "spider_id": 1, "spidercode": 1, "runtime": 1, "node_ip": 1, "crawlab_taskid": 1, "create_at": 1, } }, {"$match": {"runtime": runtime}}, { "$group": { "_id": "$spider_id", "crawlab_item": { "$addToSet": { "spidercode": "$spidercode", "create_at": "$create_at", "node_ip": "$node_ip", "crawlab_taskid": "$crawlab_taskid" }, } } } ] results = aggregate_query(spider_heartbeat, pipeline) for doc in results: crawlab_item = sorted(doc["crawlab_item"], key=itemgetter("create_at"), reverse=True) items: dict = crawlab_item[0] spidercode = items.pop("spidercode") if not aggregate_items.get(spidercode): aggregate_items.setdefault(spidercode, items) return aggregate_items _runtime = get_runtime() aggregate_results = aggregate_query_crawl_count(_runtime) aggregate_list_results = aggregate_query_crawl_list(_runtime) aggregate_query_crawlab_results = aggregate_query_crawlab_info(_runtime) def get_node_and_taskid(spidercode, default=None): """获取最新爬虫工作节点和任务id""" if aggregate_query_crawlab_results.get(spidercode): default = aggregate_query_crawlab_results[spidercode] return default def get_list_isgetdata(hash_key, default=0): """列表页是否采集数据""" if aggregate_results.get(hash_key): default = aggregate_results[hash_key]["list_count"] return True if default > 0 else False def get_list_allintimes(hash_key, default=0): """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)""" if aggregate_list_results.get(hash_key): default = aggregate_list_results[hash_key]["list_allintimes"] return default def get_list_runtimes(hash_key, default=0): """列表采集运行频次""" if aggregate_results.get(hash_key): default = aggregate_results[hash_key]["list_runtimes"] return default def get_detail_downloadnum(hash_key, default=0): """详情页下载量""" if aggregate_results.get(hash_key): default = aggregate_results[hash_key]["detail_count"] return default def get_detail_downloadsuccessnum(hash_key, default=0): """详情页下载成功量""" if aggregate_results.get(hash_key): default = aggregate_results[hash_key]["detail_rel_count"] return default def get_detail_downloadfailnum(**kwargs): """详情页下载失败量""" count = -1 if kwargs["detail_downloadnum"] >= 0 and kwargs["detail_downloadnum"] >= 0: count = kwargs["detail_downloadnum"] - kwargs["detail_downloadsuccessnum"] return count def main(): summary_queue = [] crawlers = get_crawler_basic_information() for crawler in crawlers: site = crawler["site"] channel = crawler["channel"] spidercode = crawler["spidercode"] join_data = {**crawler} # 添加爬虫基础数据 hash_key = get_md5(site, channel, spidercode) if aggregate_results.get(hash_key): # crawlab平台 crawlab = get_node_and_taskid(spidercode) if crawlab: join_data["py_taskid"] = crawlab["crawlab_taskid"] join_data["py_nodename"] = crawlab["node_ip"] # 聚合查询心跳统计结果 result = aggregate_results[hash_key] join_data["spider_id"] = result["spider_id"] join_data["business_type"] = result["business_type"] join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"] join_data["list_count"] = result["list_count"] join_data["list_rel_count"] = result["list_rel_count"] join_data["detail_count"] = result["detail_count"] join_data["detail_rel_count"] = result["detail_rel_count"] # 列表采集汇总数据 join_data["list_isgetdata"] = get_list_isgetdata(hash_key) join_data["list_allintimes"] = get_list_allintimes(hash_key) join_data["list_runtimes"] = get_list_runtimes(hash_key) # 详情采集汇总数据 join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data) summary_queue.append(join_data) logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计") # 上传数据库 save(summary_queue, spider_monitor) if __name__ == '__main__': main()