123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-05
- ---------
- @summary: 爬虫监控数据汇总
- ---------
- @author: dzr
- """
- import hashlib
- from datetime import datetime, time, timedelta
- from bson.int64 import Int64
- from bson.son import SON
- from pymongo import MongoClient
- from log import logger
- # mongo
- # MONGO_HOST = "172.17.4.87"
- # MONGO_PORT = 27080
- MONGO_HOST = "127.0.0.1"
- MONGO_PORT = 27001
- client = MongoClient(MONGO_HOST, MONGO_PORT)
- MONGO_DB1 = "py_spider"
- MONGO_DB2 = "editor"
- mongodb1 = client[MONGO_DB1]
- mongodb2 = client[MONGO_DB2]
- # 爬虫数据表
- data_bak = mongodb1["data_bak"]
- # 心跳表
- spider_heartbeat = mongodb1["spider_heartbeat"]
- # py_spiders列表
- py_spiders_crawl_list = mongodb1["crawl_data"]
- # 列表页汇总表
- spider_monitor = mongodb1["spider_monitor"]
- # luaconfig表
- spider_lua_config = mongodb2["luaconfig"]
- def get_hask_key(*args, **kwargs):
- """
- @summary: 获取唯一的32位md5
- ---------
- @param args: 参与联合去重的值集合
- @param kwargs: 参与联合去重的值字典
- ---------
- @result: 7c8684bcbdfcea6697650aa53d7b1405
- """
- conditions = ["site", "channel", "spidercode"]
- if args:
- trait = list(filter(lambda x: x is not None, args))
- else:
- trait = set()
- for k, v in kwargs.items():
- if k in conditions and kwargs[k] is not None:
- trait.add(kwargs[k])
- if not trait or len(trait) != 3:
- # raise AttributeError(f"缺少{conditions}属性")
- logger.error(f"[Monitor]缺少{conditions}属性,内容:{trait}")
- return None
- join_data = "_".join(sorted(trait)).encode()
- return hashlib.md5(join_data).hexdigest()
- def save(documents, collection):
- """保存数据"""
- is_list = isinstance(documents, list)
- documents = documents if is_list else [documents]
- count = 0
- data_lst = []
- for items in documents:
- items.pop("_id", None)
- items.pop("business_type", None)
- items["comeintime"] = Int64(datetime.now().timestamp())
- data_lst.append(items)
- count += 1
- if len(data_lst) % 100 == 0:
- collection.insert_many(data_lst)
- data_lst.clear()
- logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
- # 提交剩余数据
- collection.insert_many(data_lst)
- logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
- def get_runtime(datestr=None):
- if datestr is None:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- datestr = yesterday.strftime("%Y-%m-%d")
- return datestr
- def get_crawler_basic_information():
- """爬虫基础信息"""
- crawler_lst = []
- q = {"platform": "python", "state": 11}
- projection = {"_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code":1}
- cursor = spider_lua_config.find(q, projection=projection)
- try:
- for doc in cursor:
- crawler_lst.append({
- "site": doc["site"],
- "channel": doc["channel"],
- "spidercode": doc["code"],
- "modifyid": doc["modifyuserid"],
- "modifyuser": doc["modifyuser"],
- })
- finally:
- client.close()
- logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫")
- yield from crawler_lst
- def get_node_and_taskid(runtime, spidercode):
- """获取最新爬虫工作节点和任务id"""
- q = {"runtime": runtime, "spidercode": spidercode}
- projection = {"node_ip": 1, "crawlab_taskid":1, "_id": 0}
- sort = [("_id", -1)]
- result = spider_heartbeat.find_one(q, projection=projection, sort=sort)
- return result
- def aggregate_query(runtime):
- """feapder采集聚合查询"""
- websites = []
- aggregate_items = {}
- pipeline = [
- {"$match": {"runtime": runtime}},
- {
- "$group": {
- "_id": "$spider_id",
- "rel_count": {"$sum": "$rel_count"}, # 入库量
- "count": {"$sum": "$count"}, # 下载量
- "spider_item": {
- "$addToSet": {
- "site": "$site",
- "channel": "$channel",
- "spidercode": "$spidercode",
- "business_type": "$business_type"
- }
- }
- }
- },
- {"$sort": SON([("rel_count", -1)])}
- ]
- cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
- try:
- for doc in cursor:
- spider_item = doc["spider_item"]
- for items in spider_item:
- site = items["site"]
- channel = items["channel"]
- spidercode = items["spidercode"]
- business_type = items["business_type"]
- if len(spider_item) > 1:
- logger.warning(f"{spidercode} -> {site}--对应的关系数量异常")
- is_list = str(business_type).endswith("List")
- hash_key = get_hask_key(**items) # 防止多站点对应1个spidercode,数据相互重叠
- if not hash_key:
- logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
- continue
- if not aggregate_items.get(hash_key):
- data = {
- "spider_id": doc["_id"],
- "site": site,
- "channel": items["channel"],
- "spidercode": spidercode,
- "business_type": business_type,
- "runtime": runtime,
- "spidercode_at_site_num": 0 # 爬虫代码与站点对应的关系数量
- }
- if is_list:
- data["list_count"] = doc["count"]
- data["list_rel_count"] = doc["rel_count"]
- data["list_runtimes"] = 1
- data["detail_count"] = 0
- data["detail_rel_count"] = 0
- data["detail_runtimes"] = 0
- else:
- data["detail_count"] = doc["count"]
- data["detail_rel_count"] = doc["rel_count"]
- data["detail_runtimes"] = 1
- data["list_count"] = 0
- data["list_rel_count"] = 0
- data["list_runtimes"] = 0
- aggregate_items.setdefault(hash_key, data)
- else:
- data = aggregate_items.get(hash_key)
- if is_list:
- data["list_count"] += doc["count"]
- data["list_rel_count"] += doc["rel_count"]
- data["list_runtimes"] += 1
- else:
- data["detail_count"] += doc["count"]
- data["detail_rel_count"] += doc["rel_count"]
- data["detail_runtimes"] += 1
- aggregate_items.update({hash_key: data})
- # 监控爬虫任务,当 spidercode_at_site_num > 1
- # 表明创建的爬虫任务存在问题,需要反馈给数据寻源组
- if site not in websites:
- aggregate_items[hash_key]["spidercode_at_site_num"] = 1
- websites.append(site)
- else:
- aggregate_items[hash_key]["spidercode_at_site_num"] += 1
- # except Exception as e:
- # logger.exception(e)
- finally:
- client.close()
- return aggregate_items
- _runtime = get_runtime()
- aggregate_results = aggregate_query(_runtime)
- def get_list_isgetdata(hash_key):
- """列表页是否采集数据"""
- count = 0
- if aggregate_results.get(hash_key):
- count += aggregate_results[hash_key]["list_count"]
- return True if count > 0 else False
- def get_list_allintimes(hash_key):
- """日采集列表的总入库量"""
- count = 0
- if aggregate_results.get(hash_key):
- count += aggregate_results[hash_key]["list_rel_count"]
- return count
- def get_list_runtimes(hash_key):
- count = 0
- if aggregate_results.get(hash_key):
- count += aggregate_results.get(hash_key)["list_runtimes"]
- return count
- def get_detail_downloadnum(hash_key):
- """详情页下载量"""
- count = 0
- if aggregate_results.get(hash_key):
- count += aggregate_results.get(hash_key)["detail_count"]
- return count
- def get_detail_downloadsuccessnum(hash_key):
- """详情页下载成功量"""
- count = 0
- if aggregate_results.get(hash_key):
- count += aggregate_results.get(hash_key)["detail_rel_count"]
- return count
- def get_count(document, business_type: str):
- if business_type.title() not in ["List", "Detail"]:
- raise ValueError("business_type")
- if str(document["business_type"]).endswith(business_type):
- return document["count"]
- return 0
- def get_rel_count(document, business_type: str):
- if business_type.title() not in ["List", "Detail"]:
- raise ValueError("business_type")
- if str(document["business_type"]).endswith(business_type):
- return document["rel_count"]
- return 0
- def main():
- summary_queue = []
- crawlers = get_crawler_basic_information()
- for crawler in crawlers:
- site = crawler["site"]
- channel = crawler["channel"]
- spidercode = crawler["spidercode"]
- join_data = {**crawler} # 添加爬虫基础数据
- hash_key = get_hask_key(site, channel, spidercode)
- if aggregate_results.get(hash_key):
- # crawlab平台
- crawlab = get_node_and_taskid(_runtime, spidercode)
- if crawlab:
- join_data["py_taskid"] = crawlab["crawlab_taskid"]
- join_data["py_nodename"] = crawlab["node_ip"]
- result = aggregate_results[hash_key]
- # 查询聚合心跳统计结果
- join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
- join_data["business_type"] = result["business_type"]
- join_data["spider_id"] = result["spider_id"]
- join_data["list_count"] = result["list_count"]
- join_data["list_rel_count"] = result["list_rel_count"]
- join_data["detail_count"] = result["detail_count"]
- join_data["detail_rel_count"] = result["detail_rel_count"]
- # 计算列表页数据
- join_data["list_isgetdata"] = get_list_isgetdata(hash_key) # 列表页是否采集数据
- join_data["list_allintimes"] = get_list_allintimes(hash_key) # 日采集列表的总入库量
- join_data["list_runtimes"] = get_list_runtimes(hash_key) # 列表页采集运行频次
- # 计算详情页数据
- join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) # 详情页下载量
- join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) # 详情页下载成功量
- join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"] # 下载详情失败数量
- else:
- join_data["list_isgetdata"] = False
- join_data["list_allintimes"] = -1
- join_data["list_runtimes"] = -1
- join_data["detail_downloadnum"] = -1
- join_data["detail_downloadsuccessnum"] = -1
- join_data["detail_downloadfailnum"] = -1
- logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
- summary_queue.append(join_data)
- save(summary_queue, spider_monitor)
- if __name__ == '__main__':
- main()
|