monitor_summary.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-05
  4. ---------
  5. @summary: 爬虫监控数据汇总
  6. ---------
  7. @author: dzr
  8. """
  9. import hashlib
  10. from datetime import datetime, time, timedelta
  11. from bson.int64 import Int64
  12. from bson.son import SON
  13. from pymongo import MongoClient
  14. from log import logger
  15. from operator import itemgetter
  16. # mongo
  17. # MONGO_HOST = "172.17.4.87"
  18. # MONGO_PORT = 27080
  19. MONGO_HOST = "127.0.0.1"
  20. MONGO_PORT = 27001
  21. client = MongoClient(MONGO_HOST, MONGO_PORT)
  22. MONGO_DB1 = "py_spider"
  23. MONGO_DB2 = "editor"
  24. mongodb1 = client[MONGO_DB1]
  25. mongodb2 = client[MONGO_DB2]
  26. # 爬虫数据表
  27. data_bak = mongodb1["data_bak"]
  28. # 心跳表
  29. spider_heartbeat = mongodb1["spider_heartbeat"]
  30. # py_spiders列表
  31. py_spiders_crawl_list = mongodb1["crawl_data"]
  32. # 列表页汇总表
  33. spider_monitor = mongodb1["spider_monitor"]
  34. # luaconfig表
  35. spider_lua_config = mongodb2["luaconfig"]
  36. # 特殊网站
  37. special_sites = ["云南省政府采购网", "湖南省政府采购电子卖场"]
  38. def get_md5(*args, **kwargs):
  39. """
  40. @summary: 获取唯一的32位md5
  41. ---------
  42. @param args: 参与联合去重的值数组
  43. @param kwargs: 参与联合去重的值字典
  44. ---------
  45. @result: 7c8684bcbdfcea6697650aa53d7b1405
  46. """
  47. conditions = ["site", "channel", "spidercode"]
  48. data_lst = list(filter(lambda x: x is not None, args))
  49. for k, v in kwargs.items():
  50. if k in conditions and (v and v not in data_lst):
  51. data_lst.append(v)
  52. if not data_lst or len(data_lst) != 3:
  53. # raise AttributeError(f"缺少{conditions}属性")
  54. logger.error(f"[Monitor]缺少{conditions}属性,内容:{data_lst}")
  55. return None
  56. data_lst = sorted(data_lst)
  57. content = "_".join(data_lst)
  58. return hashlib.md5(str(content).encode()).hexdigest()
  59. def get_runtime(datestr=None):
  60. if datestr is None:
  61. today = datetime.now().date()
  62. yesterday = today + timedelta(days=-1)
  63. datestr = yesterday.strftime("%Y-%m-%d")
  64. return datestr
  65. def save(documents, collection):
  66. """保存数据"""
  67. is_list = isinstance(documents, list)
  68. documents = documents if is_list else [documents]
  69. count = 0
  70. data_lst = []
  71. for items in documents:
  72. items.pop("_id", None)
  73. items.pop("business_type", None)
  74. items["comeintime"] = Int64(datetime.now().timestamp())
  75. data_lst.append(items)
  76. count += 1
  77. if len(data_lst) % 100 == 0:
  78. collection.insert_many(data_lst)
  79. data_lst.clear()
  80. logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
  81. # 提交剩余数据
  82. collection.insert_many(data_lst)
  83. logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
  84. def get_crawler_basic_information():
  85. """爬虫基础信息"""
  86. crawler_lst = []
  87. q = {"platform": "python", "state": 11}
  88. projection = {
  89. "_id": 0, "site": 1, "channel": 1,
  90. "modifyuser": 1, "modifyuserid": 1, "code": 1, "event": 1
  91. }
  92. cursor = spider_lua_config.find(q, projection=projection)
  93. try:
  94. for doc in cursor:
  95. crawler_lst.append({
  96. "site": doc["site"],
  97. "channel": doc["channel"],
  98. "spidercode": doc["code"],
  99. "modifyid": doc["modifyuserid"],
  100. "modifyuser": doc["modifyuser"],
  101. "event": doc["event"]
  102. })
  103. finally:
  104. client.close()
  105. logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫")
  106. yield from crawler_lst
  107. def aggregate_query(collection, pipeline, is_print_error=False):
  108. """
  109. 聚合查询
  110. @param collection: MongoDB集合
  111. @param pipeline: 聚合查询条件
  112. @param is_print_error: 是否在console打印错误日志
  113. @return: 聚合结果
  114. """
  115. results = []
  116. cursor = collection.aggregate(pipeline, allowDiskUse=True)
  117. try:
  118. for doc in cursor:
  119. results.append(doc)
  120. except Exception as e:
  121. if is_print_error:
  122. logger.exception(e)
  123. finally:
  124. client.close()
  125. yield from results
  126. def aggregate_query_crawl_count(runtime):
  127. """
  128. feapder爬虫列表和详情采集聚合查询结果统计
  129. @param runtime: 运行时间
  130. @return:
  131. """
  132. aggregate_items = {}
  133. pipeline = [
  134. {"$match": {"runtime": runtime}},
  135. {
  136. "$group": {
  137. "_id": "$spider_id",
  138. "rel_count": {"$sum": "$rel_count"}, # 入库量
  139. "count": {"$sum": "$count"}, # 下载量
  140. "spider_item": {
  141. "$addToSet": {
  142. "site": "$site",
  143. "channel": "$channel",
  144. "spidercode": "$spidercode",
  145. "business_type": "$business_type"
  146. }
  147. }
  148. }
  149. },
  150. {"$sort": SON([("rel_count", -1)])}
  151. ]
  152. results = aggregate_query(spider_heartbeat, pipeline)
  153. for doc in results:
  154. spider_item = doc["spider_item"]
  155. label_dict = {}
  156. for items in spider_item:
  157. site = items["site"]
  158. channel = items["channel"]
  159. spidercode = items["spidercode"]
  160. business_type = items["business_type"]
  161. if len(spider_item) > 1 and site not in special_sites:
  162. logger.warning(f"[Monitor]{spidercode} -> {site}--存在风险, {len(spider_item)}")
  163. is_list = str(business_type).endswith("List")
  164. hash_key = get_md5(**items) # 防止多站点对应1个spidercode,数据相互重叠
  165. if not hash_key:
  166. # logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
  167. logger.error(f"[Monitor]{doc['_id']}--爬虫异常")
  168. continue
  169. if not aggregate_items.get(hash_key):
  170. data = {
  171. "spider_id": doc["_id"],
  172. "site": site,
  173. "channel": items["channel"],
  174. "spidercode": spidercode,
  175. "business_type": business_type,
  176. "runtime": runtime,
  177. "spidercode_at_site_num": 0 # 爬虫代码与站点对应的关系数量
  178. }
  179. if is_list:
  180. data["list_count"] = doc["count"]
  181. data["list_rel_count"] = doc["rel_count"]
  182. data["list_runtimes"] = 1
  183. data["detail_count"] = 0
  184. data["detail_rel_count"] = 0
  185. data["detail_runtimes"] = 0
  186. else:
  187. data["detail_count"] = doc["count"]
  188. data["detail_rel_count"] = doc["rel_count"]
  189. data["detail_runtimes"] = 1
  190. data["list_count"] = 0
  191. data["list_rel_count"] = 0
  192. data["list_runtimes"] = 0
  193. aggregate_items.setdefault(hash_key, data)
  194. else:
  195. data = aggregate_items.get(hash_key)
  196. if is_list:
  197. data["list_count"] += doc["count"]
  198. data["list_rel_count"] += doc["rel_count"]
  199. data["list_runtimes"] += 1
  200. else:
  201. data["detail_count"] += doc["count"]
  202. data["detail_rel_count"] += doc["rel_count"]
  203. data["detail_runtimes"] += 1
  204. aggregate_items.update({hash_key: data})
  205. # 监控爬虫任务,当 spidercode_at_site_num > 1
  206. # 表明创建的爬虫任务存在问题,问题反馈数据寻源人员
  207. if site not in special_sites:
  208. label = f"{business_type}_{spidercode}"
  209. if label not in label_dict:
  210. aggregate_items[hash_key]["spidercode_at_site_num"] = 1
  211. conditions = {"keys": [hash_key], "websites": [site]}
  212. label_dict.setdefault(label, conditions)
  213. else:
  214. # 相同spidercode但site不同的爬虫进行计数+1
  215. websites = label_dict[label]["websites"]
  216. if site not in websites:
  217. keys = label_dict[label]["keys"]
  218. for key in keys:
  219. aggregate_items[key]["spidercode_at_site_num"] += 1
  220. # 记录身份id - hash_key
  221. keys.append(hash_key)
  222. # 记录站点
  223. websites.append(site)
  224. aggregate_items[hash_key]["spidercode_at_site_num"] = len(websites)
  225. return aggregate_items
  226. def aggregate_query_crawl_list(runtime):
  227. """feapder列表爬虫采集聚合查询结果统计"""
  228. aggregate_items = {}
  229. pipeline = [
  230. {"$match": {"runtime": runtime, "business_type": {"$regex": "List"}}},
  231. {
  232. "$group": {
  233. "_id": "$spider_id",
  234. "spider_item": {
  235. "$addToSet": {
  236. "site": "$site",
  237. "channel": "$channel",
  238. "spidercode": "$spidercode",
  239. "count": {"$ifNull": ["$count", 0]},
  240. "rel_count": {"$ifNull": ["$rel_count", 0]},
  241. }
  242. }
  243. }
  244. }
  245. ]
  246. results = aggregate_query(spider_heartbeat, pipeline)
  247. for doc in results:
  248. spider_item = doc["spider_item"]
  249. for item in spider_item:
  250. hask_key = get_md5(**item)
  251. if not aggregate_items.get(hask_key):
  252. if all([
  253. item["count"] > 0,
  254. item["rel_count"] > 0,
  255. item["count"] == item["rel_count"]
  256. ]):
  257. aggregate_items.setdefault(hask_key, {"list_allintimes": 1})
  258. else:
  259. aggregate_items.get(hask_key)["list_allintimes"] += 1
  260. return aggregate_items
  261. def aggregate_query_crawlab_info(runtime):
  262. """feapder爬虫采集聚合查询crawlab平台运行信息统计"""
  263. aggregate_items = {}
  264. pipeline = [
  265. {
  266. "$project": {
  267. "_id": 0,
  268. "spider_id": 1,
  269. "spidercode": 1,
  270. "runtime": 1,
  271. "node_ip": 1,
  272. "crawlab_taskid": 1,
  273. "create_at": 1,
  274. }
  275. },
  276. {"$match": {"runtime": runtime}},
  277. {
  278. "$group": {
  279. "_id": "$spider_id",
  280. "crawlab_item": {
  281. "$addToSet": {
  282. "spidercode": "$spidercode",
  283. "create_at": "$create_at",
  284. "node_ip": "$node_ip",
  285. "crawlab_taskid": "$crawlab_taskid"
  286. },
  287. }
  288. }
  289. }
  290. ]
  291. results = aggregate_query(spider_heartbeat, pipeline)
  292. for doc in results:
  293. crawlab_item = sorted(doc["crawlab_item"], key=itemgetter("create_at"), reverse=True)
  294. items: dict = crawlab_item[0]
  295. spidercode = items.pop("spidercode")
  296. if not aggregate_items.get(spidercode):
  297. aggregate_items.setdefault(spidercode, items)
  298. return aggregate_items
  299. _runtime = get_runtime()
  300. aggregate_results = aggregate_query_crawl_count(_runtime)
  301. aggregate_list_results = aggregate_query_crawl_list(_runtime)
  302. aggregate_query_crawlab_results = aggregate_query_crawlab_info(_runtime)
  303. def get_node_and_taskid(spidercode, default=None):
  304. """获取最新爬虫工作节点和任务id"""
  305. if aggregate_query_crawlab_results.get(spidercode):
  306. default = aggregate_query_crawlab_results[spidercode]
  307. return default
  308. def get_list_isgetdata(hash_key, default=0):
  309. """列表页是否采集数据"""
  310. if aggregate_results.get(hash_key):
  311. default = aggregate_results[hash_key]["list_count"]
  312. return True if default > 0 else False
  313. def get_list_allintimes(hash_key, default=0):
  314. """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)"""
  315. if aggregate_list_results.get(hash_key):
  316. default = aggregate_list_results[hash_key]["list_allintimes"]
  317. return default
  318. def get_list_runtimes(hash_key, default=0):
  319. """列表采集运行频次"""
  320. if aggregate_results.get(hash_key):
  321. default = aggregate_results[hash_key]["list_runtimes"]
  322. return default
  323. def get_detail_downloadnum(hash_key, default=0):
  324. """详情页下载量"""
  325. if aggregate_results.get(hash_key):
  326. default = aggregate_results[hash_key]["detail_count"]
  327. return default
  328. def get_detail_downloadsuccessnum(hash_key, default=0):
  329. """详情页下载成功量"""
  330. if aggregate_results.get(hash_key):
  331. default = aggregate_results[hash_key]["detail_rel_count"]
  332. return default
  333. def get_detail_downloadfailnum(**kwargs):
  334. """详情页下载失败量"""
  335. count = -1
  336. if kwargs["detail_downloadnum"] >= 0 and kwargs["detail_downloadnum"] >= 0:
  337. count = kwargs["detail_downloadnum"] - kwargs["detail_downloadsuccessnum"]
  338. return count
  339. def main():
  340. summary_queue = []
  341. crawlers = get_crawler_basic_information()
  342. for crawler in crawlers:
  343. site = crawler["site"]
  344. channel = crawler["channel"]
  345. spidercode = crawler["spidercode"]
  346. join_data = {**crawler} # 添加爬虫基础数据
  347. hash_key = get_md5(site, channel, spidercode)
  348. if aggregate_results.get(hash_key):
  349. # crawlab平台
  350. crawlab = get_node_and_taskid(spidercode)
  351. if crawlab:
  352. join_data["py_taskid"] = crawlab["crawlab_taskid"]
  353. join_data["py_nodename"] = crawlab["node_ip"]
  354. # 聚合查询心跳统计结果
  355. result = aggregate_results[hash_key]
  356. join_data["spider_id"] = result["spider_id"]
  357. join_data["business_type"] = result["business_type"]
  358. join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
  359. join_data["list_count"] = result["list_count"]
  360. join_data["list_rel_count"] = result["list_rel_count"]
  361. join_data["detail_count"] = result["detail_count"]
  362. join_data["detail_rel_count"] = result["detail_rel_count"]
  363. # 列表采集汇总数据
  364. join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
  365. join_data["list_allintimes"] = get_list_allintimes(hash_key)
  366. join_data["list_runtimes"] = get_list_runtimes(hash_key)
  367. # 详情采集汇总数据
  368. join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
  369. join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
  370. join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
  371. summary_queue.append(join_data)
  372. logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
  373. # 上传数据库
  374. save(summary_queue, spider_monitor)
  375. if __name__ == '__main__':
  376. main()