monitor.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-05
  4. ---------
  5. @summary: 爬虫运行监控(feapder)和日常采集统计
  6. ---------
  7. @author: Dzr
  8. """
  9. import hashlib
  10. from datetime import datetime, timedelta
  11. from bson.int64 import Int64
  12. from bson.son import SON
  13. from pymongo import MongoClient
  14. from log import logger
  15. # mongo
  16. MONGO_HOST = "172.17.4.87"
  17. MONGO_PORT = 27080
  18. client = MongoClient(MONGO_HOST, MONGO_PORT)
  19. MONGO_DB1 = "py_spider"
  20. MONGO_DB2 = "editor"
  21. mongodb1 = client[MONGO_DB1]
  22. mongodb2 = client[MONGO_DB2]
  23. # 爬虫数据生产表
  24. data_bak = mongodb1["data_bak"]
  25. # 爬虫心跳表
  26. spider_heartbeat = mongodb1["pyspider_heartbeat"]
  27. # 日采集详情汇总表
  28. spider_monitor = mongodb1["spider_monitor"]
  29. # 采集任务表
  30. spider_lua_config = mongodb2["luaconfig"]
  31. # 特殊网站
  32. special_sites = ["云南省政府采购网", "湖南省政府采购电子卖场"]
  33. def get_md5(*args, **kwargs):
  34. """
  35. @summary: 获取唯一的32位md5
  36. ---------
  37. @param args: 参与联合去重的值数组
  38. @param kwargs: 参与联合去重的值字典
  39. ---------
  40. @result: 7c8684bcbdfcea6697650aa53d7b1405
  41. """
  42. if len(args) != 1:
  43. conditions = ["site", "channel", "spidercode"]
  44. data_lst = list(filter(lambda x: x is not None, args))
  45. for k, v in kwargs.items():
  46. if k in conditions and (v and v not in data_lst):
  47. data_lst.append(v)
  48. if not data_lst or len(data_lst) != 3:
  49. logger.error(f"组合条件缺失:{conditions},当前内容:{data_lst}")
  50. return None
  51. data_lst = sorted(data_lst)
  52. content = "_".join(data_lst)
  53. else:
  54. content = args[0]
  55. return hashlib.md5(str(content).encode()).hexdigest()
  56. def get_runtime(datestr=None):
  57. if datestr is None:
  58. today = datetime.now().date()
  59. yesterday = today + timedelta(days=-1)
  60. datestr = yesterday.strftime("%Y-%m-%d")
  61. return datestr
  62. def save(documents, collection, ordered=False):
  63. """保存数据"""
  64. is_list = isinstance(documents, list)
  65. documents = documents if is_list else [documents]
  66. data_lst = []
  67. for items in documents:
  68. items.pop("_id", None)
  69. items.pop("business_type", None)
  70. items["comeintime"] = Int64(datetime.now().timestamp())
  71. data_lst.append(items)
  72. if len(data_lst) == 100:
  73. ret = collection.insert_many(data_lst, ordered)
  74. logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据")
  75. data_lst = []
  76. # 提交剩余数据
  77. if data_lst:
  78. collection.insert_many(data_lst, ordered)
  79. logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
  80. def get_spider_lst():
  81. """爬虫基础信息"""
  82. crawler_lst = []
  83. q = {"platform": "python", "state": 11}
  84. projection = {
  85. "_id": 0, "site": 1, "channel": 1,
  86. "modifyuser": 1, "modifyuserid": 1, "code": 1, "event": 1
  87. }
  88. cursor = spider_lua_config.find(q, projection=projection)
  89. try:
  90. for doc in cursor:
  91. crawler_lst.append({
  92. "site": doc["site"],
  93. "channel": doc["channel"],
  94. "code": doc["code"],
  95. "modifyid": doc["modifyuserid"],
  96. "modifyuser": doc["modifyuser"],
  97. "event": doc["event"]
  98. })
  99. finally:
  100. client.close()
  101. logger.info(f"爬虫监控 - 已上线 {len(crawler_lst)} 个爬虫")
  102. yield from crawler_lst
  103. def aggregate_query(collection, pipeline, is_print_error=False):
  104. """
  105. 聚合查询
  106. @param collection: MongoDB集合
  107. @param pipeline: 聚合查询条件
  108. @param is_print_error: 是否在console打印错误日志
  109. @return: 聚合结果
  110. """
  111. results = []
  112. cursor = collection.aggregate(pipeline, allowDiskUse=True)
  113. try:
  114. for doc in cursor:
  115. results.append(doc)
  116. except Exception as e:
  117. if is_print_error:
  118. logger.exception(e)
  119. finally:
  120. client.close()
  121. yield from results
  122. def aggregate_query_count(runtime):
  123. """
  124. feapder爬虫列表和详情采集数据量统计聚合查询结果统计
  125. @param runtime: 运行时间
  126. @return:
  127. """
  128. spider_dict = {} # spidercode 与 site 排查记录
  129. aggregate_items = {}
  130. pipeline = [
  131. {"$match": {"runtime": runtime}},
  132. {
  133. "$group": {
  134. "_id": "$batch_no",
  135. "rel_count": {"$sum": "$rel_count"}, # 入库量(去重)
  136. "count": {"$sum": "$count"}, # 下载量
  137. "site": {"$first": "$site"},
  138. "channel": {"$first": "$channel"},
  139. "spidercode": {"$first": "$spidercode"},
  140. "business_type": {"$first": "$business_type"},
  141. }
  142. },
  143. {"$sort": SON([("rel_count", -1)])}
  144. ]
  145. results = aggregate_query(spider_heartbeat, pipeline)
  146. for items in results:
  147. site = items["site"]
  148. channel = items["channel"]
  149. spidercode = items["spidercode"]
  150. business_type = items["business_type"]
  151. business_type = "list" if business_type.endswith("List") else "detail"
  152. hash_key = get_md5(**items) # 爬虫查询标识
  153. if not hash_key:
  154. logger.error(f"异常批次号 {items['_id']}")
  155. continue
  156. # 通过爬虫业务类型 分拣数据
  157. data_dict = {
  158. "site": site,
  159. "channel": channel,
  160. "spidercode": spidercode,
  161. "runtime": runtime,
  162. "spidercode_at_site_num": 0, # 爬虫代码与站点对应的关系数量
  163. "frame": "feapder", # 采集框架
  164. business_type: {
  165. "batch_no": items["_id"],
  166. "count": items["count"],
  167. "rel_count": items["rel_count"],
  168. "runtimes": 1
  169. }
  170. }
  171. if not aggregate_items.get(hash_key):
  172. aggregate_items.setdefault(hash_key, data_dict)
  173. else:
  174. aggregate_items[hash_key][business_type] = data_dict[business_type]
  175. # 监控爬虫任务,当 spidercode_at_site_num > 1
  176. # 表明创建的爬虫任务存在问题,需反馈数据寻源相关人员
  177. if site not in special_sites:
  178. spider = aggregate_items[hash_key]
  179. if spidercode not in spider_dict:
  180. spider["spidercode_at_site_num"] = 1
  181. values = {"keys": [hash_key], "sites": [site]}
  182. spider_dict.setdefault(spidercode, values)
  183. else:
  184. # spidercode 相同,但 site 不同的爬虫进行计数+1
  185. sites = spider_dict[spidercode]["sites"]
  186. if site not in sites:
  187. keys = spider_dict[spidercode]["keys"]
  188. for key in keys:
  189. # 更新相同 spidercode 的 spidercode_at_site_num
  190. aggregate_items[key]["spidercode_at_site_num"] += 1
  191. keys.append(hash_key) # 记录新爬虫
  192. sites.append(site) # 添加新站点
  193. spider["spidercode_at_site_num"] = len(sites)
  194. else:
  195. aggregate_items[hash_key]["spidercode_at_site_num"] = 1
  196. return aggregate_items
  197. def aggregate_query_crawl_list(runtime):
  198. """feapder列表爬虫采集聚合查询结果统计"""
  199. aggregate_items = {}
  200. pipeline = [
  201. {
  202. "$match": {
  203. "runtime": runtime,
  204. "business_type": {"$regex": "List"},
  205. "status_code": {"$ne": -1}
  206. }
  207. },
  208. {
  209. "$group": {
  210. "_id": "$batch_no",
  211. "count": {"$sum": "$count"},
  212. "rel_count": {"$sum": "$rel_count"},
  213. "site": {"$first": "$site"},
  214. "channel": {"$first": "$channel"},
  215. "spidercode": {"$first": "$spidercode"},
  216. }
  217. }
  218. ]
  219. results = aggregate_query(spider_heartbeat, pipeline)
  220. for items in results:
  221. hask_key = get_md5(**items)
  222. if not aggregate_items.get(hask_key):
  223. values = {"list_allintimes": 0, "list_nodatatimes": 0}
  224. aggregate_items.setdefault(hask_key, values)
  225. if all([
  226. items["count"] > 0,
  227. items["rel_count"] > 0,
  228. items["count"] == items["rel_count"]
  229. ]):
  230. aggregate_items[hask_key]["list_allintimes"] += 1
  231. if items["count"] == 0:
  232. aggregate_items[hask_key]["list_nodatatimes"] += 1
  233. return aggregate_items
  234. def aggregate_count_crawlab_update_runtimes(runtime):
  235. """feapder爬虫采集聚合查询crawlab平台运行信息"""
  236. pipeline = [
  237. {
  238. "$project": {
  239. "_id": 0,
  240. "site": 1,
  241. "channel": 1,
  242. "batch_no": 1,
  243. "spidercode": 1,
  244. "business_type": 1,
  245. "runtime": 1,
  246. "node_ip": 1,
  247. "crawlab_taskid": 1,
  248. "create_at": 1,
  249. }
  250. },
  251. {"$match": {"runtime": runtime}},
  252. {
  253. "$group": {
  254. "_id": "$batch_no",
  255. "business_type": {"$first": "$business_type"},
  256. "site": {"$first": "$site"},
  257. "channel": {"$first": "$channel"},
  258. "spidercode": {"$first": "$spidercode"},
  259. "crawlab_item": {
  260. "$addToSet": {
  261. "node_ip": "$node_ip",
  262. "crawlab_taskid": "$crawlab_taskid",
  263. },
  264. }
  265. }
  266. }
  267. ]
  268. results = aggregate_query(spider_heartbeat, pipeline)
  269. for items in results:
  270. runtimes = len(items["crawlab_item"]) # 采集任务运行次数
  271. # 通过爬虫业务类型 分拣数据
  272. business_type = items["business_type"]
  273. business_type = "list" if business_type.endswith("List") else "detail"
  274. hash_key = get_md5(**items) # 爬虫查询标识
  275. # 更新聚合统计任务运行次数
  276. aggregate_count_items[hash_key][business_type]["runtimes"] = runtimes
  277. _runtime = get_runtime() # 统计时间
  278. aggregate_count_items = aggregate_query_count(_runtime)
  279. aggregate_crawl_list_items = aggregate_query_crawl_list(_runtime)
  280. aggregate_count_crawlab_update_runtimes(_runtime)
  281. def get_list_isgetdata(hash_key, default=0):
  282. """列表页是否采集数据"""
  283. query_result = aggregate_count_items.get(hash_key)
  284. if query_result:
  285. default = query_result["list"]["count"]
  286. return True if default > 0 else False
  287. def get_list_allintimes(hash_key, default=0):
  288. """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)"""
  289. if aggregate_crawl_list_items.get(hash_key):
  290. default = aggregate_crawl_list_items[hash_key]["list_allintimes"]
  291. return default
  292. def get_list_runtimes(hash_key, default=0):
  293. """列表采集运行频次"""
  294. query_result = aggregate_count_items.get(hash_key)
  295. if query_result and "list" in query_result:
  296. default = aggregate_count_items[hash_key]["list"]["runtimes"]
  297. return default
  298. def get_list_count(hash_key, default=0):
  299. """列表采集总数"""
  300. query_result = aggregate_count_items.get(hash_key)
  301. if query_result and "list" in query_result:
  302. default = aggregate_count_items[hash_key]["list"]["count"]
  303. return default
  304. def get_list_rel_count(hash_key, default=0):
  305. """列表实际入库总数"""
  306. query_result = aggregate_count_items.get(hash_key)
  307. if query_result and "list" in query_result:
  308. default = aggregate_count_items[hash_key]["list"]["rel_count"]
  309. return default
  310. def get_list_nodatatimes(hash_key, default=-1):
  311. """列表页采集无数据次数(过滤后)"""
  312. if aggregate_crawl_list_items.get(hash_key):
  313. default = aggregate_crawl_list_items[hash_key]["list_nodatatimes"]
  314. return default
  315. def get_detail_downloadnum(hash_key, default=0):
  316. """详情页下载量"""
  317. query_result = aggregate_count_items.get(hash_key)
  318. if query_result and "detail" in query_result:
  319. default = aggregate_count_items[hash_key]["detail"]["count"]
  320. return default
  321. get_detail_count = get_detail_downloadnum
  322. def get_detail_downloadsuccessnum(hash_key, default=0):
  323. """详情页下载成功量"""
  324. query_result = aggregate_count_items.get(hash_key)
  325. if query_result and "detail" in query_result:
  326. default = aggregate_count_items[hash_key]["detail"]["rel_count"]
  327. return default
  328. get_detail_rel_count = get_detail_downloadsuccessnum
  329. def get_detail_downloadfailnum(**kwargs):
  330. """详情页下载失败量"""
  331. count = -1
  332. if kwargs["detail_downloadnum"] >= 0 and kwargs["detail_downloadnum"] >= 0:
  333. count = kwargs["detail_downloadnum"] - kwargs["detail_downloadsuccessnum"]
  334. return count
  335. def start_monitor():
  336. summary_queue = []
  337. spider_lst = get_spider_lst()
  338. for spider in spider_lst:
  339. site = spider["site"]
  340. channel = spider["channel"]
  341. spidercode = spider["code"]
  342. join_data = {**spider, "is_valid": False} # 创建爬虫基础数据
  343. hash_key = get_md5(site, channel, spidercode)
  344. query_result = aggregate_count_items.get(hash_key)
  345. if query_result:
  346. # 聚合查询 - 统计数据采集信息
  347. result = query_result
  348. join_data["frame"] = result["frame"] # 采集框架名
  349. join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
  350. # 聚合统计 - 列表采集数据
  351. join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
  352. join_data["list_allintimes"] = get_list_allintimes(hash_key)
  353. join_data["list_runtimes"] = get_list_runtimes(hash_key)
  354. join_data["list_nodatatimes"] = get_list_nodatatimes(hash_key)
  355. join_data["list_count"] = get_list_count(hash_key)
  356. join_data["list_rel_count"] = get_list_rel_count(hash_key)
  357. # 聚合统计 - 详情采集数据
  358. join_data["detail_count"] = get_detail_count(hash_key)
  359. join_data["detail_rel_count"] = get_detail_rel_count(hash_key)
  360. join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
  361. join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
  362. join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
  363. # 日统计采集数据是否有效
  364. frame = join_data.get("frame")
  365. if frame and frame == "feapder":
  366. join_data["is_valid"] = True
  367. summary_queue.append(join_data)
  368. logger.info(f"{site} {channel} {spidercode} --统计完成")
  369. # 上传数据库
  370. save(summary_queue, spider_monitor)
  371. logger.info("爬虫监控 - 日统计完成")
  372. if __name__ == '__main__':
  373. start_monitor()