# -*- coding: utf-8 -*- """ Created on 2023-04-05 --------- @summary: 爬虫监控数据汇总 --------- @author: dzr """ import hashlib from datetime import datetime, time, timedelta from bson.int64 import Int64 from bson.son import SON from pymongo import MongoClient from log import logger # mongo # MONGO_HOST = "172.17.4.87" # MONGO_PORT = 27080 MONGO_HOST = "127.0.0.1" MONGO_PORT = 27001 client = MongoClient(MONGO_HOST, MONGO_PORT) MONGO_DB1 = "py_spider" MONGO_DB2 = "editor" mongodb1 = client[MONGO_DB1] mongodb2 = client[MONGO_DB2] # 爬虫数据表 data_bak = mongodb1["data_bak"] # 心跳表 spider_heartbeat = mongodb1["spider_heartbeat"] # py_spiders列表 py_spiders_crawl_list = mongodb1["crawl_data"] # 列表页汇总表 spider_monitor = mongodb1["spider_monitor"] # luaconfig表 spider_lua_config = mongodb2["luaconfig"] def get_hask_key(*args, **kwargs): """ @summary: 获取唯一的32位md5 --------- @param args: 参与联合去重的值集合 @param kwargs: 参与联合去重的值字典 --------- @result: 7c8684bcbdfcea6697650aa53d7b1405 """ conditions = ["site", "channel", "spidercode"] if args: trait = list(filter(lambda x: x is not None, args)) else: trait = set() for k, v in kwargs.items(): if k in conditions and kwargs[k] is not None: trait.add(kwargs[k]) if not trait or len(trait) != 3: # raise AttributeError(f"缺少{conditions}属性") logger.error(f"[Monitor]缺少{conditions}属性,内容:{trait}") return None join_data = "_".join(sorted(trait)).encode() return hashlib.md5(join_data).hexdigest() def save(documents, collection): """保存数据""" is_list = isinstance(documents, list) documents = documents if is_list else [documents] count = 0 data_lst = [] for items in documents: items.pop("_id", None) items.pop("business_type", None) items["comeintime"] = Int64(datetime.now().timestamp()) data_lst.append(items) count += 1 if len(data_lst) % 100 == 0: collection.insert_many(data_lst) data_lst.clear() logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存") # 提交剩余数据 collection.insert_many(data_lst) logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存") def get_runtime(datestr=None): if datestr is None: today = datetime.now().date() yesterday = today + timedelta(days=-1) datestr = yesterday.strftime("%Y-%m-%d") return datestr def get_crawler_basic_information(): """爬虫基础信息""" crawler_lst = [] q = {"platform": "python", "state": 11} projection = {"_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code":1} cursor = spider_lua_config.find(q, projection=projection) try: for doc in cursor: crawler_lst.append({ "site": doc["site"], "channel": doc["channel"], "spidercode": doc["code"], "modifyid": doc["modifyuserid"], "modifyuser": doc["modifyuser"], }) finally: client.close() logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫") yield from crawler_lst def get_node_and_taskid(runtime, spidercode): """获取最新爬虫工作节点和任务id""" q = {"runtime": runtime, "spidercode": spidercode} projection = {"node_ip": 1, "crawlab_taskid":1, "_id": 0} sort = [("_id", -1)] result = spider_heartbeat.find_one(q, projection=projection, sort=sort) return result def aggregate_query(runtime): """feapder采集聚合查询""" websites = [] aggregate_items = {} pipeline = [ {"$match": {"runtime": runtime}}, { "$group": { "_id": "$spider_id", "rel_count": {"$sum": "$rel_count"}, # 入库量 "count": {"$sum": "$count"}, # 下载量 "spider_item": { "$addToSet": { "site": "$site", "channel": "$channel", "spidercode": "$spidercode", "business_type": "$business_type" } } } }, {"$sort": SON([("rel_count", -1)])} ] cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True) try: for doc in cursor: spider_item = doc["spider_item"] for items in spider_item: site = items["site"] channel = items["channel"] spidercode = items["spidercode"] business_type = items["business_type"] if len(spider_item) > 1: logger.warning(f"{spidercode} -> {site}--对应的关系数量异常") is_list = str(business_type).endswith("List") hash_key = get_hask_key(**items) # 防止多站点对应1个spidercode,数据相互重叠 if not hash_key: logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常") continue if not aggregate_items.get(hash_key): data = { "spider_id": doc["_id"], "site": site, "channel": items["channel"], "spidercode": spidercode, "business_type": business_type, "runtime": runtime, "spidercode_at_site_num": 0 # 爬虫代码与站点对应的关系数量 } if is_list: data["list_count"] = doc["count"] data["list_rel_count"] = doc["rel_count"] data["list_runtimes"] = 1 data["detail_count"] = 0 data["detail_rel_count"] = 0 data["detail_runtimes"] = 0 else: data["detail_count"] = doc["count"] data["detail_rel_count"] = doc["rel_count"] data["detail_runtimes"] = 1 data["list_count"] = 0 data["list_rel_count"] = 0 data["list_runtimes"] = 0 aggregate_items.setdefault(hash_key, data) else: data = aggregate_items.get(hash_key) if is_list: data["list_count"] += doc["count"] data["list_rel_count"] += doc["rel_count"] data["list_runtimes"] += 1 else: data["detail_count"] += doc["count"] data["detail_rel_count"] += doc["rel_count"] data["detail_runtimes"] += 1 aggregate_items.update({hash_key: data}) # 监控爬虫任务,当 spidercode_at_site_num > 1 # 表明创建的爬虫任务存在问题,需要反馈给数据寻源组 if site not in websites: aggregate_items[hash_key]["spidercode_at_site_num"] = 1 websites.append(site) else: aggregate_items[hash_key]["spidercode_at_site_num"] += 1 # except Exception as e: # logger.exception(e) finally: client.close() return aggregate_items _runtime = get_runtime() aggregate_results = aggregate_query(_runtime) def get_list_isgetdata(hash_key): """列表页是否采集数据""" count = 0 if aggregate_results.get(hash_key): count += aggregate_results[hash_key]["list_count"] return True if count > 0 else False def get_list_allintimes(hash_key): """日采集列表的总入库量""" count = 0 if aggregate_results.get(hash_key): count += aggregate_results[hash_key]["list_rel_count"] return count def get_list_runtimes(hash_key): count = 0 if aggregate_results.get(hash_key): count += aggregate_results.get(hash_key)["list_runtimes"] return count def get_detail_downloadnum(hash_key): """详情页下载量""" count = 0 if aggregate_results.get(hash_key): count += aggregate_results.get(hash_key)["detail_count"] return count def get_detail_downloadsuccessnum(hash_key): """详情页下载成功量""" count = 0 if aggregate_results.get(hash_key): count += aggregate_results.get(hash_key)["detail_rel_count"] return count def get_count(document, business_type: str): if business_type.title() not in ["List", "Detail"]: raise ValueError("business_type") if str(document["business_type"]).endswith(business_type): return document["count"] return 0 def get_rel_count(document, business_type: str): if business_type.title() not in ["List", "Detail"]: raise ValueError("business_type") if str(document["business_type"]).endswith(business_type): return document["rel_count"] return 0 def main(): summary_queue = [] crawlers = get_crawler_basic_information() for crawler in crawlers: site = crawler["site"] channel = crawler["channel"] spidercode = crawler["spidercode"] join_data = {**crawler} # 添加爬虫基础数据 hash_key = get_hask_key(site, channel, spidercode) if aggregate_results.get(hash_key): # crawlab平台 crawlab = get_node_and_taskid(_runtime, spidercode) if crawlab: join_data["py_taskid"] = crawlab["crawlab_taskid"] join_data["py_nodename"] = crawlab["node_ip"] result = aggregate_results[hash_key] # 查询聚合心跳统计结果 join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"] join_data["business_type"] = result["business_type"] join_data["spider_id"] = result["spider_id"] join_data["list_count"] = result["list_count"] join_data["list_rel_count"] = result["list_rel_count"] join_data["detail_count"] = result["detail_count"] join_data["detail_rel_count"] = result["detail_rel_count"] # 计算列表页数据 join_data["list_isgetdata"] = get_list_isgetdata(hash_key) # 列表页是否采集数据 join_data["list_allintimes"] = get_list_allintimes(hash_key) # 日采集列表的总入库量 join_data["list_runtimes"] = get_list_runtimes(hash_key) # 列表页采集运行频次 # 计算详情页数据 join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) # 详情页下载量 join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) # 详情页下载成功量 join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"] # 下载详情失败数量 else: join_data["list_isgetdata"] = False join_data["list_allintimes"] = -1 join_data["list_runtimes"] = -1 join_data["detail_downloadnum"] = -1 join_data["detail_downloadsuccessnum"] = -1 join_data["detail_downloadfailnum"] = -1 logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计") summary_queue.append(join_data) save(summary_queue, spider_monitor) if __name__ == '__main__': main()