# -*- coding: utf-8 -*- """ Created on 2023-04-05 --------- @summary: 爬虫运行监控(feapder)和日常采集统计 --------- @author: Dzr """ import hashlib from datetime import datetime, timedelta from bson.int64 import Int64 from bson.son import SON from pymongo import MongoClient from log import logger # mongo MONGO_HOST = "172.17.4.87" MONGO_PORT = 27080 client = MongoClient(MONGO_HOST, MONGO_PORT) MONGO_DB1 = "py_spider" MONGO_DB2 = "editor" mongodb1 = client[MONGO_DB1] mongodb2 = client[MONGO_DB2] # 爬虫数据生产表 data_bak = mongodb1["data_bak"] # 爬虫心跳表 spider_heartbeat = mongodb1["pyspider_heartbeat"] # 日采集详情汇总表 spider_monitor = mongodb1["spider_monitor"] # 采集任务表 spider_lua_config = mongodb2["luaconfig"] # 特殊网站 special_sites = ["云南省政府采购网", "湖南省政府采购电子卖场"] def get_md5(*args, **kwargs): """ @summary: 获取唯一的32位md5 --------- @param args: 参与联合去重的值数组 @param kwargs: 参与联合去重的值字典 --------- @result: 7c8684bcbdfcea6697650aa53d7b1405 """ if len(args) != 1: conditions = ["site", "channel", "spidercode"] data_lst = list(filter(lambda x: x is not None, args)) for k, v in kwargs.items(): if k in conditions and (v and v not in data_lst): data_lst.append(v) if not data_lst or len(data_lst) != 3: logger.error(f"组合条件缺失:{conditions},当前内容:{data_lst}") return None data_lst = sorted(data_lst) content = "_".join(data_lst) else: content = args[0] return hashlib.md5(str(content).encode()).hexdigest() def get_runtime(datestr=None): if datestr is None: today = datetime.now().date() yesterday = today + timedelta(days=-1) datestr = yesterday.strftime("%Y-%m-%d") return datestr def save(documents, collection, ordered=False): """保存数据""" is_list = isinstance(documents, list) documents = documents if is_list else [documents] data_lst = [] for items in documents: items.pop("_id", None) items.pop("business_type", None) items["comeintime"] = Int64(datetime.now().timestamp()) data_lst.append(items) if len(data_lst) == 100: ret = collection.insert_many(data_lst, ordered) logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据") data_lst = [] # 提交剩余数据 if data_lst: collection.insert_many(data_lst, ordered) logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据") def get_spider_lst(): """爬虫基础信息""" crawler_lst = [] q = {"platform": "python", "state": 11} projection = { "_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code": 1, "event": 1 } cursor = spider_lua_config.find(q, projection=projection) try: for doc in cursor: crawler_lst.append({ "site": doc["site"], "channel": doc["channel"], "code": doc["code"], "modifyid": doc["modifyuserid"], "modifyuser": doc["modifyuser"], "event": doc["event"] }) finally: client.close() logger.info(f"爬虫监控 - 已上线 {len(crawler_lst)} 个爬虫") yield from crawler_lst def aggregate_query(collection, pipeline, is_print_error=False): """ 聚合查询 @param collection: MongoDB集合 @param pipeline: 聚合查询条件 @param is_print_error: 是否在console打印错误日志 @return: 聚合结果 """ results = [] cursor = collection.aggregate(pipeline, allowDiskUse=True) try: for doc in cursor: results.append(doc) except Exception as e: if is_print_error: logger.exception(e) finally: client.close() yield from results def aggregate_query_count(runtime): """ feapder爬虫列表和详情采集数据量统计聚合查询结果统计 @param runtime: 运行时间 @return: """ spider_dict = {} # spidercode 与 site 排查记录 aggregate_items = {} pipeline = [ {"$match": {"runtime": runtime}}, { "$group": { "_id": "$batch_no", "rel_count": {"$sum": "$rel_count"}, # 入库量(去重) "count": {"$sum": "$count"}, # 下载量 "site": {"$first": "$site"}, "channel": {"$first": "$channel"}, "spidercode": {"$first": "$spidercode"}, "business_type": {"$first": "$business_type"}, } }, {"$sort": SON([("rel_count", -1)])} ] results = aggregate_query(spider_heartbeat, pipeline) for items in results: site = items["site"] channel = items["channel"] spidercode = items["spidercode"] business_type = items["business_type"] business_type = "list" if business_type.endswith("List") else "detail" hash_key = get_md5(**items) # 爬虫查询标识 if not hash_key: logger.error(f"异常批次号 {items['_id']}") continue # 通过爬虫业务类型 分拣数据 data_dict = { "site": site, "channel": channel, "spidercode": spidercode, "runtime": runtime, "spidercode_at_site_num": 0, # 爬虫代码与站点对应的关系数量 "frame": "feapder", # 采集框架 business_type: { "batch_no": items["_id"], "count": items["count"], "rel_count": items["rel_count"], "runtimes": 1 } } if not aggregate_items.get(hash_key): aggregate_items.setdefault(hash_key, data_dict) else: aggregate_items[hash_key][business_type] = data_dict[business_type] # 监控爬虫任务,当 spidercode_at_site_num > 1 # 表明创建的爬虫任务存在问题,需反馈数据寻源相关人员 if site not in special_sites: spider = aggregate_items[hash_key] if spidercode not in spider_dict: spider["spidercode_at_site_num"] = 1 values = {"keys": [hash_key], "sites": [site]} spider_dict.setdefault(spidercode, values) else: # spidercode 相同,但 site 不同的爬虫进行计数+1 sites = spider_dict[spidercode]["sites"] if site not in sites: keys = spider_dict[spidercode]["keys"] for key in keys: # 更新相同 spidercode 的 spidercode_at_site_num aggregate_items[key]["spidercode_at_site_num"] += 1 keys.append(hash_key) # 记录新爬虫 sites.append(site) # 添加新站点 spider["spidercode_at_site_num"] = len(sites) else: aggregate_items[hash_key]["spidercode_at_site_num"] = 1 return aggregate_items def aggregate_query_crawl_list(runtime): """feapder列表爬虫采集聚合查询结果统计""" aggregate_items = {} pipeline = [ { "$match": { "runtime": runtime, "business_type": {"$regex": "List"}, "status_code": {"$ne": -1} } }, { "$group": { "_id": "$batch_no", "count": {"$sum": "$count"}, "rel_count": {"$sum": "$rel_count"}, "site": {"$first": "$site"}, "channel": {"$first": "$channel"}, "spidercode": {"$first": "$spidercode"}, } } ] results = aggregate_query(spider_heartbeat, pipeline) for items in results: hask_key = get_md5(**items) if not aggregate_items.get(hask_key): values = {"list_allintimes": 0, "list_nodatatimes": 0} aggregate_items.setdefault(hask_key, values) if all([ items["count"] > 0, items["rel_count"] > 0, items["count"] == items["rel_count"] ]): aggregate_items[hask_key]["list_allintimes"] += 1 if items["count"] == 0: aggregate_items[hask_key]["list_nodatatimes"] += 1 return aggregate_items def aggregate_count_crawlab_update_runtimes(runtime): """feapder爬虫采集聚合查询crawlab平台运行信息""" pipeline = [ { "$project": { "_id": 0, "site": 1, "channel": 1, "batch_no": 1, "spidercode": 1, "business_type": 1, "runtime": 1, "node_ip": 1, "crawlab_taskid": 1, "create_at": 1, } }, {"$match": {"runtime": runtime}}, { "$group": { "_id": "$batch_no", "business_type": {"$first": "$business_type"}, "site": {"$first": "$site"}, "channel": {"$first": "$channel"}, "spidercode": {"$first": "$spidercode"}, "crawlab_item": { "$addToSet": { "node_ip": "$node_ip", "crawlab_taskid": "$crawlab_taskid", }, } } } ] results = aggregate_query(spider_heartbeat, pipeline) for items in results: runtimes = len(items["crawlab_item"]) # 采集任务运行次数 # 通过爬虫业务类型 分拣数据 business_type = items["business_type"] business_type = "list" if business_type.endswith("List") else "detail" hash_key = get_md5(**items) # 爬虫查询标识 # 更新聚合统计任务运行次数 aggregate_count_items[hash_key][business_type]["runtimes"] = runtimes _runtime = get_runtime() # 统计时间 aggregate_count_items = aggregate_query_count(_runtime) aggregate_crawl_list_items = aggregate_query_crawl_list(_runtime) aggregate_count_crawlab_update_runtimes(_runtime) def get_list_isgetdata(hash_key, default=0): """列表页是否采集数据""" query_result = aggregate_count_items.get(hash_key) if query_result: default = query_result["list"]["count"] return True if default > 0 else False def get_list_allintimes(hash_key, default=0): """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)""" if aggregate_crawl_list_items.get(hash_key): default = aggregate_crawl_list_items[hash_key]["list_allintimes"] return default def get_list_runtimes(hash_key, default=0): """列表采集运行频次""" query_result = aggregate_count_items.get(hash_key) if query_result and "list" in query_result: default = aggregate_count_items[hash_key]["list"]["runtimes"] return default def get_list_count(hash_key, default=0): """列表采集总数""" query_result = aggregate_count_items.get(hash_key) if query_result and "list" in query_result: default = aggregate_count_items[hash_key]["list"]["count"] return default def get_list_rel_count(hash_key, default=0): """列表实际入库总数""" query_result = aggregate_count_items.get(hash_key) if query_result and "list" in query_result: default = aggregate_count_items[hash_key]["list"]["rel_count"] return default def get_list_nodatatimes(hash_key, default=-1): """列表页采集无数据次数(过滤后)""" if aggregate_crawl_list_items.get(hash_key): default = aggregate_crawl_list_items[hash_key]["list_nodatatimes"] return default def get_detail_downloadnum(hash_key, default=0): """详情页下载量""" query_result = aggregate_count_items.get(hash_key) if query_result and "detail" in query_result: default = aggregate_count_items[hash_key]["detail"]["count"] return default get_detail_count = get_detail_downloadnum def get_detail_downloadsuccessnum(hash_key, default=0): """详情页下载成功量""" query_result = aggregate_count_items.get(hash_key) if query_result and "detail" in query_result: default = aggregate_count_items[hash_key]["detail"]["rel_count"] return default get_detail_rel_count = get_detail_downloadsuccessnum def get_detail_downloadfailnum(**kwargs): """详情页下载失败量""" count = -1 if kwargs["detail_downloadnum"] >= 0 and kwargs["detail_downloadnum"] >= 0: count = kwargs["detail_downloadnum"] - kwargs["detail_downloadsuccessnum"] return count def start_monitor(): summary_queue = [] spider_lst = get_spider_lst() for spider in spider_lst: site = spider["site"] channel = spider["channel"] spidercode = spider["code"] join_data = {**spider, "is_valid": False} # 创建爬虫基础数据 hash_key = get_md5(site, channel, spidercode) query_result = aggregate_count_items.get(hash_key) if query_result: # 聚合查询 - 统计数据采集信息 result = query_result join_data["frame"] = result["frame"] # 采集框架名 join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"] # 聚合统计 - 列表采集数据 join_data["list_isgetdata"] = get_list_isgetdata(hash_key) join_data["list_allintimes"] = get_list_allintimes(hash_key) join_data["list_runtimes"] = get_list_runtimes(hash_key) join_data["list_nodatatimes"] = get_list_nodatatimes(hash_key) join_data["list_count"] = get_list_count(hash_key) join_data["list_rel_count"] = get_list_rel_count(hash_key) # 聚合统计 - 详情采集数据 join_data["detail_count"] = get_detail_count(hash_key) join_data["detail_rel_count"] = get_detail_rel_count(hash_key) join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data) # 日统计采集数据是否有效 frame = join_data.get("frame") if frame and frame == "feapder": join_data["is_valid"] = True summary_queue.append(join_data) logger.info(f"{site} {channel} {spidercode} --统计完成") # 上传数据库 save(summary_queue, spider_monitor) logger.info("爬虫监控 - 日统计完成") if __name__ == '__main__': start_monitor()