|
@@ -0,0 +1,332 @@
|
|
|
+import hashlib
|
|
|
+from datetime import datetime, time, timedelta
|
|
|
+
|
|
|
+from bson.int64 import Int64
|
|
|
+from bson.son import SON
|
|
|
+from pymongo import MongoClient
|
|
|
+
|
|
|
+from log import logger
|
|
|
+
|
|
|
+# mongo
|
|
|
+# MONGO_HOST = "172.17.4.87"
|
|
|
+# MONGO_PORT = 27080
|
|
|
+
|
|
|
+MONGO_HOST = "127.0.0.1"
|
|
|
+MONGO_PORT = 27001
|
|
|
+client = MongoClient(MONGO_HOST, MONGO_PORT)
|
|
|
+
|
|
|
+MONGO_DB1 = "py_spider"
|
|
|
+MONGO_DB2 = "editor"
|
|
|
+
|
|
|
+mongodb1 = client[MONGO_DB1]
|
|
|
+mongodb2 = client[MONGO_DB2]
|
|
|
+
|
|
|
+# 爬虫数据表
|
|
|
+data_bak = mongodb1["data_bak"]
|
|
|
+
|
|
|
+# 心跳表
|
|
|
+spider_heartbeat = mongodb1["spider_heartbeat"]
|
|
|
+
|
|
|
+# py_spiders列表
|
|
|
+py_spiders_crawl_list = mongodb1["crawl_data"]
|
|
|
+
|
|
|
+# 列表页汇总表
|
|
|
+spider_monitor = mongodb1["spider_monitor"]
|
|
|
+
|
|
|
+# luaconfig表
|
|
|
+spider_lua_config = mongodb2["luaconfig"]
|
|
|
+
|
|
|
+
|
|
|
+def get_hask_key(*args):
|
|
|
+ """
|
|
|
+ @summary: 获取唯一的32位md5
|
|
|
+ ---------
|
|
|
+ @param *args: 参与联合去重的值
|
|
|
+ ---------
|
|
|
+ @result: 7c8684bcbdfcea6697650aa53d7b1405
|
|
|
+ """
|
|
|
+ join_data = "_".join(*args).encode()
|
|
|
+ return hashlib.md5(join_data).hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def save(documents, collection):
|
|
|
+ """保存数据"""
|
|
|
+
|
|
|
+ is_list = isinstance(documents, list)
|
|
|
+ documents = documents if is_list else [documents]
|
|
|
+
|
|
|
+ count = 0
|
|
|
+ data_lst = []
|
|
|
+ for item in documents:
|
|
|
+ item.pop("_id", None)
|
|
|
+ item.pop("business_type", None)
|
|
|
+ item["comeintime"] = Int64(datetime.now().timestamp())
|
|
|
+ data_lst.append(item)
|
|
|
+ count += 1
|
|
|
+ if len(data_lst) % 100 == 0:
|
|
|
+ collection.insert_many(data_lst)
|
|
|
+ data_lst.clear()
|
|
|
+ logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
|
|
|
+
|
|
|
+ # 提交剩余数据
|
|
|
+ collection.insert_many(data_lst)
|
|
|
+ logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
|
|
|
+
|
|
|
+
|
|
|
+def get_runtime(datestr=None):
|
|
|
+ if datestr is None:
|
|
|
+ today = datetime.now().date()
|
|
|
+ yesterday = today + timedelta(days=-1)
|
|
|
+ datestr = yesterday.strftime("%Y-%m-%d")
|
|
|
+ return datestr
|
|
|
+
|
|
|
+
|
|
|
+def get_crawler_basic_information():
|
|
|
+ """爬虫基础信息"""
|
|
|
+ crawler_lst = []
|
|
|
+ q = {"platform": "python", "state": 11}
|
|
|
+ projection = {"_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code":1}
|
|
|
+ cursor = spider_lua_config.find(q, projection=projection)
|
|
|
+ try:
|
|
|
+ for doc in cursor:
|
|
|
+ crawler_lst.append({
|
|
|
+ "site": doc["site"],
|
|
|
+ "channel": doc["channel"],
|
|
|
+ "spidercode": doc["code"],
|
|
|
+ "modifyid": doc["modifyuserid"],
|
|
|
+ "modifyuser": doc["modifyuser"],
|
|
|
+ })
|
|
|
+ finally:
|
|
|
+ client.close()
|
|
|
+ logger.info(f"爬虫采集日报--共计{len(crawler_lst)}个爬虫")
|
|
|
+ yield from crawler_lst
|
|
|
+
|
|
|
+
|
|
|
+def get_node_and_taskid(runtime, spidercode):
|
|
|
+ """获取最新爬虫工作节点和任务id"""
|
|
|
+ q = {"runtime": runtime, "spidercode": spidercode}
|
|
|
+ projection = {"node_ip": 1, "crawlab_taskid":1, "_id": 0}
|
|
|
+ sort = [("_id", -1)]
|
|
|
+ result = spider_heartbeat.find_one(q, projection=projection, sort=sort)
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def aggregate_query(runtime):
|
|
|
+ """feapder采集聚合查询"""
|
|
|
+ pipeline = [
|
|
|
+ {"$match": {"runtime": runtime}},
|
|
|
+ {
|
|
|
+ "$group": {
|
|
|
+ "_id": "$spider_id",
|
|
|
+ "rel_count": {"$sum": "$rel_count"}, # 实际下载量
|
|
|
+ "count": {"$sum": "$count"}, # 下载量
|
|
|
+ "spider_item": {
|
|
|
+ "$addToSet": {
|
|
|
+ "site": "$site",
|
|
|
+ "channel": "$channel",
|
|
|
+ "spidercode": "$spidercode",
|
|
|
+ "business_type": "$business_type"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {"$sort": SON([("rel_count", -1)])}
|
|
|
+ ]
|
|
|
+
|
|
|
+ aggregate_items = {}
|
|
|
+ website_lst = []
|
|
|
+
|
|
|
+ cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
|
|
|
+ try:
|
|
|
+ for doc in cursor:
|
|
|
+ spider_item = doc["spider_item"]
|
|
|
+
|
|
|
+ spidercode_at_site_num = 0
|
|
|
+
|
|
|
+ for item in spider_item:
|
|
|
+ site = item["site"]
|
|
|
+ channel = item["channel"]
|
|
|
+ spidercode = item["spidercode"]
|
|
|
+
|
|
|
+ hash_key = get_hask_key([site, channel, spidercode]) # 防止多站点对应1个spidercode,数据相互重叠
|
|
|
+
|
|
|
+ same_site = True
|
|
|
+ if site not in website_lst:
|
|
|
+ same_site = False
|
|
|
+ website_lst.append(site)
|
|
|
+
|
|
|
+ if not same_site and aggregate_items.get(hash_key):
|
|
|
+ aggregate_items.get(hash_key)["spidercode_at_site_num"] += 1
|
|
|
+ else:
|
|
|
+ spidercode_at_site_num += 1
|
|
|
+
|
|
|
+ if not aggregate_items.get(hash_key):
|
|
|
+ data = {
|
|
|
+ "business_type": item["business_type"],
|
|
|
+ "spider_id": doc["_id"],
|
|
|
+ "site": site,
|
|
|
+ "channel": item["channel"],
|
|
|
+ "spidercode": spidercode,
|
|
|
+ "runtime": runtime,
|
|
|
+ "spidercode_at_site_num": spidercode_at_site_num # 爬虫代码对应的站点数量
|
|
|
+ }
|
|
|
+
|
|
|
+ is_list = str(item["business_type"]).endswith("List")
|
|
|
+ if is_list:
|
|
|
+ data["list_count"] = doc["count"]
|
|
|
+ data["list_rel_count"] = doc["rel_count"]
|
|
|
+ data["detail_count"] = 0
|
|
|
+ data["detail_rel_count"] = 0
|
|
|
+ data["list_runtimes"] = 1
|
|
|
+ data["detail_runtimes"] = 0
|
|
|
+ else:
|
|
|
+ data["list_count"] = 0
|
|
|
+ data["list_rel_count"] = 0
|
|
|
+ data["detail_count"] = doc["count"]
|
|
|
+ data["detail_rel_count"] = doc["rel_count"]
|
|
|
+ data["detail_runtimes"] = 1
|
|
|
+ data["list_runtimes"] = 0
|
|
|
+
|
|
|
+ if len(spider_item) > 1:
|
|
|
+ logger.warning(f"{spidercode} -> {site} --映射关系错误")
|
|
|
+
|
|
|
+ aggregate_items.setdefault(hash_key, data)
|
|
|
+
|
|
|
+ else:
|
|
|
+ data = aggregate_items.get(hash_key)
|
|
|
+ is_list = str(item["business_type"]).endswith("List")
|
|
|
+ if is_list:
|
|
|
+ data["list_count"] += doc["count"]
|
|
|
+ data["list_rel_count"] += doc["rel_count"]
|
|
|
+ data["list_runtimes"] += 1
|
|
|
+ else:
|
|
|
+ data["detail_count"] += doc["count"]
|
|
|
+ data["detail_rel_count"] += doc["rel_count"]
|
|
|
+ data["detail_runtimes"] += 1
|
|
|
+
|
|
|
+ aggregate_items.update({hash_key: data})
|
|
|
+
|
|
|
+ finally:
|
|
|
+ client.close()
|
|
|
+ return aggregate_items
|
|
|
+
|
|
|
+
|
|
|
+runtime = get_runtime()
|
|
|
+aggregate_results = aggregate_query(runtime)
|
|
|
+
|
|
|
+
|
|
|
+def get_list_isgetdata(hash_key):
|
|
|
+ """列表页是否采集数据"""
|
|
|
+ count = 0
|
|
|
+ if aggregate_results.get(hash_key):
|
|
|
+ count += aggregate_results[hash_key]["list_count"]
|
|
|
+ return True if count > 0 else False
|
|
|
+
|
|
|
+
|
|
|
+def get_list_allintimes(hash_key):
|
|
|
+ """日采集列表的总入库量"""
|
|
|
+ count = 0
|
|
|
+ if aggregate_results.get(hash_key):
|
|
|
+ count += aggregate_results[hash_key]["list_rel_count"]
|
|
|
+ return count
|
|
|
+
|
|
|
+
|
|
|
+def get_list_runtimes(hash_key):
|
|
|
+ count = 0
|
|
|
+ if aggregate_results.get(hash_key):
|
|
|
+ count += aggregate_results.get(hash_key)["list_runtimes"]
|
|
|
+ return count
|
|
|
+
|
|
|
+
|
|
|
+def get_detail_downloadnum(hash_key):
|
|
|
+ """详情页下载量"""
|
|
|
+ count = 0
|
|
|
+ if aggregate_results.get(hash_key):
|
|
|
+ count += aggregate_results.get(hash_key)["detail_count"]
|
|
|
+ return count
|
|
|
+
|
|
|
+
|
|
|
+def get_detail_downloadsuccessnum(hash_key):
|
|
|
+ """详情页下载成功量"""
|
|
|
+ count = 0
|
|
|
+ if aggregate_results.get(hash_key):
|
|
|
+ count += aggregate_results.get(hash_key)["detail_rel_count"]
|
|
|
+ return count
|
|
|
+
|
|
|
+
|
|
|
+def get_count(document, business_type: str):
|
|
|
+ if business_type.title() not in ["List", "Detail"]:
|
|
|
+ raise ValueError("business_type")
|
|
|
+
|
|
|
+ if str(document["business_type"]).endswith(business_type):
|
|
|
+ return document["count"]
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+def get_rel_count(document, business_type: str):
|
|
|
+
|
|
|
+ if business_type.title() not in ["List", "Detail"]:
|
|
|
+ raise ValueError("business_type")
|
|
|
+
|
|
|
+ if str(document["business_type"]).endswith(business_type):
|
|
|
+ return document["rel_count"]
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ summary_queue = []
|
|
|
+ crawlers = get_crawler_basic_information()
|
|
|
+ for crawler in crawlers:
|
|
|
+ site = crawler["site"]
|
|
|
+ channel = crawler["channel"]
|
|
|
+ spidercode = crawler["spidercode"]
|
|
|
+ hash_key = get_hask_key([site, channel, spidercode])
|
|
|
+
|
|
|
+ if aggregate_results.get(hash_key):
|
|
|
+ # 合并数据
|
|
|
+ join_data = {**crawler}
|
|
|
+ result = aggregate_results.get(hash_key)
|
|
|
+
|
|
|
+ join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
|
|
|
+
|
|
|
+ join_data["business_type"] = result["business_type"]
|
|
|
+ join_data["spider_id"] = result["spider_id"]
|
|
|
+
|
|
|
+ join_data["list_count"] = result["list_count"]
|
|
|
+ join_data["list_rel_count"] = result["list_rel_count"]
|
|
|
+ join_data["detail_count"] = result["detail_count"]
|
|
|
+ join_data["detail_rel_count"] = result["detail_rel_count"]
|
|
|
+
|
|
|
+ # crawlab平台
|
|
|
+ crawlab = get_node_and_taskid(runtime, spidercode)
|
|
|
+ if crawlab:
|
|
|
+ join_data["py_taskid"] = crawlab["crawlab_taskid"]
|
|
|
+ join_data["py_nodename"] = crawlab["node_ip"]
|
|
|
+
|
|
|
+ join_data["list_isgetdata"] = get_list_isgetdata(hash_key) # 列表页是否采集数据
|
|
|
+ join_data["list_allintimes"] = get_list_allintimes(hash_key) # 日采集列表的总入库量
|
|
|
+ join_data["list_runtimes"] = get_list_runtimes(hash_key) # 列表页采集运行频次
|
|
|
+
|
|
|
+ join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) # 详情页下载量
|
|
|
+ join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) # 详情页下载成功量
|
|
|
+ join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"] # 下载详情失败数量
|
|
|
+
|
|
|
+ else:
|
|
|
+ join_data = {**crawler}
|
|
|
+ join_data["list_isgetdata"] = False
|
|
|
+ join_data["list_allintimes"] = -1
|
|
|
+ join_data["list_runtimes"] = -1
|
|
|
+ join_data["detail_downloadnum"] = -1
|
|
|
+ join_data["detail_downloadsuccessnum"] = -1
|
|
|
+ join_data["detail_downloadfailnum"] = -1
|
|
|
+
|
|
|
+ logger.info(f"{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
|
|
|
+ summary_queue.append(join_data)
|
|
|
+
|
|
|
+ save(summary_queue, spider_monitor)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|