monitor_summary.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. import hashlib
  2. from datetime import datetime, time, timedelta
  3. from bson.int64 import Int64
  4. from bson.son import SON
  5. from pymongo import MongoClient
  6. from log import logger
  7. # mongo
  8. # MONGO_HOST = "172.17.4.87"
  9. # MONGO_PORT = 27080
  10. MONGO_HOST = "127.0.0.1"
  11. MONGO_PORT = 27001
  12. client = MongoClient(MONGO_HOST, MONGO_PORT)
  13. MONGO_DB1 = "py_spider"
  14. MONGO_DB2 = "editor"
  15. mongodb1 = client[MONGO_DB1]
  16. mongodb2 = client[MONGO_DB2]
  17. # 爬虫数据表
  18. data_bak = mongodb1["data_bak"]
  19. # 心跳表
  20. spider_heartbeat = mongodb1["spider_heartbeat"]
  21. # py_spiders列表
  22. py_spiders_crawl_list = mongodb1["crawl_data"]
  23. # 列表页汇总表
  24. spider_monitor = mongodb1["spider_monitor"]
  25. # luaconfig表
  26. spider_lua_config = mongodb2["luaconfig"]
  27. def get_hask_key(*args):
  28. """
  29. @summary: 获取唯一的32位md5
  30. ---------
  31. @param *args: 参与联合去重的值
  32. ---------
  33. @result: 7c8684bcbdfcea6697650aa53d7b1405
  34. """
  35. join_data = "_".join(*args).encode()
  36. return hashlib.md5(join_data).hexdigest()
  37. def save(documents, collection):
  38. """保存数据"""
  39. is_list = isinstance(documents, list)
  40. documents = documents if is_list else [documents]
  41. count = 0
  42. data_lst = []
  43. for item in documents:
  44. item.pop("_id", None)
  45. item.pop("business_type", None)
  46. item["comeintime"] = Int64(datetime.now().timestamp())
  47. data_lst.append(item)
  48. count += 1
  49. if len(data_lst) % 100 == 0:
  50. collection.insert_many(data_lst)
  51. data_lst.clear()
  52. logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
  53. # 提交剩余数据
  54. collection.insert_many(data_lst)
  55. logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
  56. def get_runtime(datestr=None):
  57. if datestr is None:
  58. today = datetime.now().date()
  59. yesterday = today + timedelta(days=-1)
  60. datestr = yesterday.strftime("%Y-%m-%d")
  61. return datestr
  62. def get_crawler_basic_information():
  63. """爬虫基础信息"""
  64. crawler_lst = []
  65. q = {"platform": "python", "state": 11}
  66. projection = {"_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code":1}
  67. cursor = spider_lua_config.find(q, projection=projection)
  68. try:
  69. for doc in cursor:
  70. crawler_lst.append({
  71. "site": doc["site"],
  72. "channel": doc["channel"],
  73. "spidercode": doc["code"],
  74. "modifyid": doc["modifyuserid"],
  75. "modifyuser": doc["modifyuser"],
  76. })
  77. finally:
  78. client.close()
  79. logger.info(f"爬虫采集日报--共计{len(crawler_lst)}个爬虫")
  80. yield from crawler_lst
  81. def get_node_and_taskid(runtime, spidercode):
  82. """获取最新爬虫工作节点和任务id"""
  83. q = {"runtime": runtime, "spidercode": spidercode}
  84. projection = {"node_ip": 1, "crawlab_taskid":1, "_id": 0}
  85. sort = [("_id", -1)]
  86. result = spider_heartbeat.find_one(q, projection=projection, sort=sort)
  87. return result
  88. def aggregate_query(runtime):
  89. """feapder采集聚合查询"""
  90. pipeline = [
  91. {"$match": {"runtime": runtime}},
  92. {
  93. "$group": {
  94. "_id": "$spider_id",
  95. "rel_count": {"$sum": "$rel_count"}, # 实际下载量
  96. "count": {"$sum": "$count"}, # 下载量
  97. "spider_item": {
  98. "$addToSet": {
  99. "site": "$site",
  100. "channel": "$channel",
  101. "spidercode": "$spidercode",
  102. "business_type": "$business_type"
  103. }
  104. }
  105. }
  106. },
  107. {"$sort": SON([("rel_count", -1)])}
  108. ]
  109. aggregate_items = {}
  110. website_lst = []
  111. cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
  112. try:
  113. for doc in cursor:
  114. spider_item = doc["spider_item"]
  115. spidercode_at_site_num = 0
  116. for item in spider_item:
  117. site = item["site"]
  118. channel = item["channel"]
  119. spidercode = item["spidercode"]
  120. hash_key = get_hask_key([site, channel, spidercode]) # 防止多站点对应1个spidercode,数据相互重叠
  121. same_site = True
  122. if site not in website_lst:
  123. same_site = False
  124. website_lst.append(site)
  125. if not same_site and aggregate_items.get(hash_key):
  126. aggregate_items.get(hash_key)["spidercode_at_site_num"] += 1
  127. else:
  128. spidercode_at_site_num += 1
  129. if not aggregate_items.get(hash_key):
  130. data = {
  131. "business_type": item["business_type"],
  132. "spider_id": doc["_id"],
  133. "site": site,
  134. "channel": item["channel"],
  135. "spidercode": spidercode,
  136. "runtime": runtime,
  137. "spidercode_at_site_num": spidercode_at_site_num # 爬虫代码对应的站点数量
  138. }
  139. is_list = str(item["business_type"]).endswith("List")
  140. if is_list:
  141. data["list_count"] = doc["count"]
  142. data["list_rel_count"] = doc["rel_count"]
  143. data["detail_count"] = 0
  144. data["detail_rel_count"] = 0
  145. data["list_runtimes"] = 1
  146. data["detail_runtimes"] = 0
  147. else:
  148. data["list_count"] = 0
  149. data["list_rel_count"] = 0
  150. data["detail_count"] = doc["count"]
  151. data["detail_rel_count"] = doc["rel_count"]
  152. data["detail_runtimes"] = 1
  153. data["list_runtimes"] = 0
  154. if len(spider_item) > 1:
  155. logger.warning(f"{spidercode} -> {site} --映射关系错误")
  156. aggregate_items.setdefault(hash_key, data)
  157. else:
  158. data = aggregate_items.get(hash_key)
  159. is_list = str(item["business_type"]).endswith("List")
  160. if is_list:
  161. data["list_count"] += doc["count"]
  162. data["list_rel_count"] += doc["rel_count"]
  163. data["list_runtimes"] += 1
  164. else:
  165. data["detail_count"] += doc["count"]
  166. data["detail_rel_count"] += doc["rel_count"]
  167. data["detail_runtimes"] += 1
  168. aggregate_items.update({hash_key: data})
  169. finally:
  170. client.close()
  171. return aggregate_items
  172. runtime = get_runtime()
  173. aggregate_results = aggregate_query(runtime)
  174. def get_list_isgetdata(hash_key):
  175. """列表页是否采集数据"""
  176. count = 0
  177. if aggregate_results.get(hash_key):
  178. count += aggregate_results[hash_key]["list_count"]
  179. return True if count > 0 else False
  180. def get_list_allintimes(hash_key):
  181. """日采集列表的总入库量"""
  182. count = 0
  183. if aggregate_results.get(hash_key):
  184. count += aggregate_results[hash_key]["list_rel_count"]
  185. return count
  186. def get_list_runtimes(hash_key):
  187. count = 0
  188. if aggregate_results.get(hash_key):
  189. count += aggregate_results.get(hash_key)["list_runtimes"]
  190. return count
  191. def get_detail_downloadnum(hash_key):
  192. """详情页下载量"""
  193. count = 0
  194. if aggregate_results.get(hash_key):
  195. count += aggregate_results.get(hash_key)["detail_count"]
  196. return count
  197. def get_detail_downloadsuccessnum(hash_key):
  198. """详情页下载成功量"""
  199. count = 0
  200. if aggregate_results.get(hash_key):
  201. count += aggregate_results.get(hash_key)["detail_rel_count"]
  202. return count
  203. def get_count(document, business_type: str):
  204. if business_type.title() not in ["List", "Detail"]:
  205. raise ValueError("business_type")
  206. if str(document["business_type"]).endswith(business_type):
  207. return document["count"]
  208. return 0
  209. def get_rel_count(document, business_type: str):
  210. if business_type.title() not in ["List", "Detail"]:
  211. raise ValueError("business_type")
  212. if str(document["business_type"]).endswith(business_type):
  213. return document["rel_count"]
  214. return 0
  215. def main():
  216. summary_queue = []
  217. crawlers = get_crawler_basic_information()
  218. for crawler in crawlers:
  219. site = crawler["site"]
  220. channel = crawler["channel"]
  221. spidercode = crawler["spidercode"]
  222. hash_key = get_hask_key([site, channel, spidercode])
  223. if aggregate_results.get(hash_key):
  224. # 合并数据
  225. join_data = {**crawler}
  226. result = aggregate_results.get(hash_key)
  227. join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
  228. join_data["business_type"] = result["business_type"]
  229. join_data["spider_id"] = result["spider_id"]
  230. join_data["list_count"] = result["list_count"]
  231. join_data["list_rel_count"] = result["list_rel_count"]
  232. join_data["detail_count"] = result["detail_count"]
  233. join_data["detail_rel_count"] = result["detail_rel_count"]
  234. # crawlab平台
  235. crawlab = get_node_and_taskid(runtime, spidercode)
  236. if crawlab:
  237. join_data["py_taskid"] = crawlab["crawlab_taskid"]
  238. join_data["py_nodename"] = crawlab["node_ip"]
  239. join_data["list_isgetdata"] = get_list_isgetdata(hash_key) # 列表页是否采集数据
  240. join_data["list_allintimes"] = get_list_allintimes(hash_key) # 日采集列表的总入库量
  241. join_data["list_runtimes"] = get_list_runtimes(hash_key) # 列表页采集运行频次
  242. join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key) # 详情页下载量
  243. join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key) # 详情页下载成功量
  244. join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"] # 下载详情失败数量
  245. else:
  246. join_data = {**crawler}
  247. join_data["list_isgetdata"] = False
  248. join_data["list_allintimes"] = -1
  249. join_data["list_runtimes"] = -1
  250. join_data["detail_downloadnum"] = -1
  251. join_data["detail_downloadsuccessnum"] = -1
  252. join_data["detail_downloadfailnum"] = -1
  253. logger.info(f"{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
  254. summary_queue.append(join_data)
  255. save(summary_queue, spider_monitor)
  256. if __name__ == '__main__':
  257. main()