monitor.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-10
  4. ---------
  5. @summary: 网站监控
  6. ---------
  7. @author: Dzr
  8. """
  9. import threading
  10. import bson
  11. import numpy as np
  12. import requests
  13. import requests.exceptions as requests_exceptions
  14. from playwright._impl._api_types import Error
  15. import utils.tools as tools
  16. from db.mongodb import MongoDB
  17. from network.request import Request
  18. from network.response import Response
  19. from utils.log import logger
  20. class MonitorParser(threading.Thread):
  21. def __init__(self, mongo_db, coll_name):
  22. threading.Thread.__init__(self)
  23. self.mgo_db = mongo_db
  24. self.coll_name = coll_name
  25. self.monitor_api = 'http://cc.spdata.jianyu360.com/crawl/site_monitor/task/fetch'
  26. def get_task(self):
  27. items = {}
  28. try:
  29. response = requests.get(self.monitor_api, timeout=5)
  30. items = response.json()['data']
  31. if '_id' in items:
  32. items['_id'] = bson.ObjectId(items['_id'])
  33. finally:
  34. return items
  35. def get_response(self, url, render=False, **kwargs):
  36. response = Response.from_dict({
  37. "url": url,
  38. "_content": b"",
  39. "cookies": {},
  40. "status_code": -1,
  41. "elapsed": 666,
  42. "headers": {}
  43. })
  44. request = Request(url=url, render=render, **kwargs)
  45. for i in range(3):
  46. try:
  47. response = request.get_response()
  48. if response.status_code != 200:
  49. if any([
  50. response.text is None,
  51. len(response.plain_text) == 0,
  52. response.tags()['tags_count'] == 0
  53. ]):
  54. continue
  55. break
  56. except Error as e:
  57. if 'The certificate for this server is invalid.' in e.message:
  58. url = url.replace('https', 'http')
  59. request = Request(url=url, render=render, **kwargs)
  60. except requests_exceptions.SSLError:
  61. url = url.replace('https', 'http')
  62. request = Request(url=url, render=True, **kwargs)
  63. except requests_exceptions as e:
  64. logger.exception(e)
  65. break
  66. logger.debug(
  67. """
  68. -------------- response for ----------------
  69. thread = %s
  70. url = %s
  71. title = %s
  72. response = %s
  73. """
  74. % (
  75. self.getName(),
  76. url,
  77. response.title(),
  78. response
  79. )
  80. )
  81. if response.status_code != -1:
  82. response = Response(response)
  83. # 设置编码
  84. response.encoding = response.encoding or "utf-8"
  85. return response
  86. def __add_items_to_db(self, task, items):
  87. result = self.mgo_db.update(
  88. coll_name=self.coll_name,
  89. condition={'_id': task['_id']},
  90. data=items
  91. )
  92. # print({'_id': task['_id']})
  93. return result
  94. def deal_task(self, task):
  95. # 栏目
  96. url = task['url']
  97. response = self.get_response(url, render=True, proxies=False)
  98. status_code = response.status_code
  99. # 栏目页面标签
  100. tags_count = response.tags()['tags_count']
  101. tags_count_diff = abs(tags_count - task['tags_count'])
  102. tags_count_diff_lst = list(task['tags_count_diff_lst'])
  103. # 栏目是否改版
  104. channel_ischange = task['channel_ischange']
  105. if len(tags_count_diff_lst) >= 3 and not channel_ischange:
  106. mean = np.mean(tags_count_diff_lst) # 均值
  107. std = np.std(tags_count_diff_lst, ddof=1) # 标准差
  108. std_range = [mean - (2 * std), mean + (2 * std)]
  109. if tags_count_diff not in std_range:
  110. channel_ischange = True
  111. if len(tags_count_diff_lst) > 3 and sum(tags_count_diff_lst) == 0:
  112. channel_ischange = True
  113. status_code = 500
  114. # 访问频次
  115. update_dt = tools.timestamp_to_date(task['update_at'], '%Y-%m-%d')
  116. is_first_monitor = tools.get_current_date('%Y-%m-%d') != update_dt
  117. if is_first_monitor:
  118. visit_count, failure_count = 1, 0
  119. if status_code != 200:
  120. failure_count = 1
  121. tags_count_diff_lst = []
  122. tags_count_diff_lst.insert(0, tags_count_diff)
  123. else:
  124. visit_count = task['visit_count'] + 1
  125. failure_count = task['failure_count']
  126. if status_code != 200:
  127. failure_count += 1
  128. tags_count_diff_lst.insert(0, tags_count_diff)
  129. items = {
  130. 'title': response.title(), # 页面标头
  131. 'tags_count': tags_count,
  132. 'tags_count_diff': tags_count_diff,
  133. 'tags_count_diff_lst': tags_count_diff_lst,
  134. 'channel_ischange': channel_ischange,
  135. 'status_code': status_code,
  136. 'visit_count': visit_count,
  137. 'failure_count': failure_count,
  138. 'update_at': tools.ensure_int64(tools.get_current_timestamp())
  139. }
  140. self.__add_items_to_db(task, items)
  141. def run(self):
  142. while True:
  143. task = self.get_task()
  144. if not task:
  145. logger.debug(f"[{self.getName()}]暂无监控任务")
  146. tools.delay_time(2)
  147. continue
  148. try:
  149. self.deal_task(task)
  150. except Exception as e:
  151. logger.exception(e)
  152. class MonitorServer(threading.Thread):
  153. def __init__(self, thread_nums=1):
  154. threading.Thread.__init__(self)
  155. self.mongo_db = MongoDB()
  156. self.coll_name = 'site_monitor'
  157. self.thread_nums = thread_nums
  158. self.parser_control_obj = MonitorParser
  159. self.parser_controls = []
  160. def run(self):
  161. for _ in range(self.thread_nums):
  162. parser_control = self.parser_control_obj(
  163. mongo_db=self.mongo_db,
  164. coll_name=self.coll_name
  165. )
  166. parser_control.start()
  167. self.parser_controls.append(parser_control)
  168. if __name__ == '__main__':
  169. MonitorServer(thread_nums=5).start()