monitor_summary.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-05
  4. ---------
  5. @summary: 爬虫监控数据汇总
  6. ---------
  7. @author: dzr
  8. """
  9. import hashlib
  10. from datetime import datetime, time, timedelta
  11. from bson.int64 import Int64
  12. from bson.son import SON
  13. from pymongo import MongoClient
  14. from log import logger
  15. # mongo
  16. # MONGO_HOST = "172.17.4.87"
  17. # MONGO_PORT = 27080
  18. MONGO_HOST = "127.0.0.1"
  19. MONGO_PORT = 27001
  20. client = MongoClient(MONGO_HOST, MONGO_PORT)
  21. MONGO_DB1 = "py_spider"
  22. MONGO_DB2 = "editor"
  23. mongodb1 = client[MONGO_DB1]
  24. mongodb2 = client[MONGO_DB2]
  25. # 爬虫数据表
  26. data_bak = mongodb1["data_bak"]
  27. # 心跳表
  28. spider_heartbeat = mongodb1["spider_heartbeat"]
  29. # py_spiders列表
  30. py_spiders_crawl_list = mongodb1["crawl_data"]
  31. # 列表页汇总表
  32. spider_monitor = mongodb1["spider_monitor"]
  33. # luaconfig表
  34. spider_lua_config = mongodb2["luaconfig"]
  35. def get_hask_key(*args, **kwargs):
  36. """
  37. @summary: 获取唯一的32位md5
  38. ---------
  39. @param args: 参与联合去重的值集合
  40. @param kwargs: 参与联合去重的值字典
  41. ---------
  42. @result: 7c8684bcbdfcea6697650aa53d7b1405
  43. """
  44. conditions = ["site", "channel", "spidercode"]
  45. if args:
  46. trait = list(filter(lambda x: x is not None, args))
  47. else:
  48. trait = set()
  49. for k, v in kwargs.items():
  50. if k in conditions and kwargs[k] is not None:
  51. trait.add(kwargs[k])
  52. if not trait or len(trait) != 3:
  53. # raise AttributeError(f"缺少{conditions}属性")
  54. logger.error(f"[Monitor]缺少{conditions}属性,内容:{trait}")
  55. return None
  56. join_data = "_".join(sorted(trait)).encode()
  57. return hashlib.md5(join_data).hexdigest()
  58. def save(documents, collection):
  59. """保存数据"""
  60. is_list = isinstance(documents, list)
  61. documents = documents if is_list else [documents]
  62. count = 0
  63. data_lst = []
  64. for items in documents:
  65. items.pop("_id", None)
  66. items.pop("business_type", None)
  67. items["comeintime"] = Int64(datetime.now().timestamp())
  68. data_lst.append(items)
  69. count += 1
  70. if len(data_lst) % 100 == 0:
  71. collection.insert_many(data_lst)
  72. data_lst.clear()
  73. logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
  74. # 提交剩余数据
  75. collection.insert_many(data_lst)
  76. logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
  77. def get_runtime(datestr=None):
  78. if datestr is None:
  79. today = datetime.now().date()
  80. yesterday = today + timedelta(days=-1)
  81. datestr = yesterday.strftime("%Y-%m-%d")
  82. return datestr
  83. def get_crawler_basic_information():
  84. """爬虫基础信息"""
  85. crawler_lst = []
  86. q = {"platform": "python", "state": 11}
  87. projection = {"_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code":1}
  88. cursor = spider_lua_config.find(q, projection=projection)
  89. try:
  90. for doc in cursor:
  91. crawler_lst.append({
  92. "site": doc["site"],
  93. "channel": doc["channel"],
  94. "spidercode": doc["code"],
  95. "modifyid": doc["modifyuserid"],
  96. "modifyuser": doc["modifyuser"],
  97. })
  98. finally:
  99. client.close()
  100. logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫")
  101. yield from crawler_lst
  102. def get_node_and_taskid(runtime, spidercode):
  103. """获取最新爬虫工作节点和任务id"""
  104. q = {"runtime": runtime, "spidercode": spidercode}
  105. projection = {"node_ip": 1, "crawlab_taskid":1, "_id": 0}
  106. sort = [("_id", -1)]
  107. result = spider_heartbeat.find_one(q, projection=projection, sort=sort)
  108. return result
  109. def aggregate_query(runtime):
  110. """feapder采集聚合查询"""
  111. websites = []
  112. aggregate_items = {}
  113. pipeline = [
  114. {"$match": {"runtime": runtime}},
  115. {
  116. "$group": {
  117. "_id": "$spider_id",
  118. "rel_count": {"$sum": "$rel_count"}, # 入库量
  119. "count": {"$sum": "$count"}, # 下载量
  120. "spider_item": {
  121. "$addToSet": {
  122. "site": "$site",
  123. "channel": "$channel",
  124. "spidercode": "$spidercode",
  125. "business_type": "$business_type"
  126. }
  127. }
  128. }
  129. },
  130. {"$sort": SON([("rel_count", -1)])}
  131. ]
  132. cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
  133. try:
  134. for doc in cursor:
  135. spider_item = doc["spider_item"]
  136. for items in spider_item:
  137. site = items["site"]
  138. channel = items["channel"]
  139. spidercode = items["spidercode"]
  140. business_type = items["business_type"]
  141. if len(spider_item) > 1:
  142. logger.warning(f"{spidercode} -> {site}--对应的关系数量异常")
  143. is_list = str(business_type).endswith("List")
  144. hash_key = get_hask_key(**items) # 防止多站点对应1个spidercode,数据相互重叠
  145. if not hash_key:
  146. logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
  147. continue
  148. if not aggregate_items.get(hash_key):
  149. data = {
  150. "spider_id": doc["_id"],
  151. "site": site,
  152. "channel": items["channel"],
  153. "spidercode": spidercode,
  154. "business_type": business_type,
  155. "runtime": runtime,
  156. "spidercode_at_site_num": 0 # 爬虫代码与站点对应的关系数量
  157. }
  158. if is_list:
  159. data["list_count"] = doc["count"]
  160. data["list_rel_count"] = doc["rel_count"]
  161. data["list_runtimes"] = 1
  162. data["detail_count"] = 0
  163. data["detail_rel_count"] = 0
  164. data["detail_runtimes"] = 0
  165. else:
  166. data["detail_count"] = doc["count"]
  167. data["detail_rel_count"] = doc["rel_count"]
  168. data["detail_runtimes"] = 1
  169. data["list_count"] = 0
  170. data["list_rel_count"] = 0
  171. data["list_runtimes"] = 0
  172. aggregate_items.setdefault(hash_key, data)
  173. else:
  174. data = aggregate_items.get(hash_key)
  175. if is_list:
  176. data["list_count"] += doc["count"]
  177. data["list_rel_count"] += doc["rel_count"]
  178. data["list_runtimes"] += 1
  179. else:
  180. data["detail_count"] += doc["count"]
  181. data["detail_rel_count"] += doc["rel_count"]
  182. data["detail_runtimes"] += 1
  183. aggregate_items.update({hash_key: data})
  184. # 监控爬虫任务,当 spidercode_at_site_num > 1
  185. # 表明创建的爬虫任务存在问题,需要反馈给数据寻源组
  186. if site not in websites:
  187. aggregate_items[hash_key]["spidercode_at_site_num"] = 1
  188. websites.append(site)
  189. else:
  190. aggregate_items[hash_key]["spidercode_at_site_num"] += 1
  191. # except Exception as e:
  192. # logger.exception(e)
  193. finally:
  194. client.close()
  195. return aggregate_items
  196. _runtime = get_runtime()
  197. aggregate_results = aggregate_query(_runtime)
  198. def get_list_isgetdata(hash_key):
  199. """列表页是否采集数据"""
  200. count = 0
  201. if aggregate_results.get(hash_key):
  202. count += aggregate_results[hash_key]["list_count"]
  203. return True if count > 0 else False
  204. def get_list_allintimes(hash_key):
  205. """日采集列表的总入库量"""
  206. count = 0
  207. if aggregate_results.get(hash_key):
  208. count += aggregate_results[hash_key]["list_rel_count"]
  209. return count
  210. def get_list_runtimes(hash_key):
  211. count = 0
  212. if aggregate_results.get(hash_key):
  213. count += aggregate_results.get(hash_key)["list_runtimes"]
  214. return count
  215. def get_detail_downloadnum(hash_key):
  216. """详情页下载量"""
  217. count = 0
  218. if aggregate_results.get(hash_key):
  219. count += aggregate_results.get(hash_key)["detail_count"]
  220. return count
  221. def get_detail_downloadsuccessnum(hash_key):
  222. """详情页下载成功量"""
  223. count = 0
  224. if aggregate_results.get(hash_key):
  225. count += aggregate_results.get(hash_key)["detail_rel_count"]
  226. return count
  227. def get_count(document, business_type: str):
  228. if business_type.title() not in ["List", "Detail"]:
  229. raise ValueError("business_type")
  230. if str(document["business_type"]).endswith(business_type):
  231. return document["count"]
  232. return 0
  233. def get_rel_count(document, business_type: str):
  234. if business_type.title() not in ["List", "Detail"]:
  235. raise ValueError("business_type")
  236. if str(document["business_type"]).endswith(business_type):
  237. return document["rel_count"]
  238. return 0
  239. def main():
  240. summary_queue = []
  241. crawlers = get_crawler_basic_information()
  242. for crawler in crawlers:
  243. site = crawler["site"]
  244. channel = crawler["channel"]
  245. spidercode = crawler["spidercode"]
  246. join_data = {**crawler} # 添加爬虫基础数据
  247. hash_key = get_hask_key(site, channel, spidercode)
  248. if aggregate_results.get(hash_key):
  249. # crawlab平台
  250. crawlab = get_node_and_taskid(_runtime, spidercode)
  251. if crawlab:
  252. join_data["py_taskid"] = crawlab["crawlab_taskid"]
  253. join_data["py_nodename"] = crawlab["node_ip"]
  254. result = aggregate_results[hash_key]
  255. # 查询聚合心跳统计结果
  256. join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
  257. join_data["business_type"] = result["business_type"]
  258. join_data["spider_id"] = result["spider_id"]
  259. join_data["list_count"] = result["list_count"]
  260. join_data["list_rel_count"] = result["list_rel_count"]
  261. join_data["detail_count"] = result["detail_count"]
  262. join_data["detail_rel_count"] = result["detail_rel_count"]
  263. # 计算列表页数据
  264. join_data["list_isgetdata"] = get_list_isgetdata(hash_key) # 列表页是否采集数据
  265. join_data["list_allintimes"] = get_list_allintimes(hash_key) # 日采集列表的总入库量
  266. join_data["list_runtimes"] = get_list_runtimes(hash_key) # 列表页采集运行频次
  267. # 计算详情页数据
  268. join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) # 详情页下载量
  269. join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) # 详情页下载成功量
  270. join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"] # 下载详情失败数量
  271. else:
  272. join_data["list_isgetdata"] = False
  273. join_data["list_allintimes"] = -1
  274. join_data["list_runtimes"] = -1
  275. join_data["detail_downloadnum"] = -1
  276. join_data["detail_downloadsuccessnum"] = -1
  277. join_data["detail_downloadfailnum"] = -1
  278. logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
  279. summary_queue.append(join_data)
  280. save(summary_queue, spider_monitor)
  281. if __name__ == '__main__':
  282. main()