# -*- coding: utf-8 -*- """ Created on 2023-05-10 --------- @summary: 网站监控 --------- @author: Dzr """ import threading import bson import numpy as np import requests import requests.exceptions as requests_exceptions from playwright._impl._api_types import Error import utils.tools as tools from db.mongodb import MongoDB from network.request import Request from network.response import Response from utils.log import logger class MonitorParser(threading.Thread): def __init__(self, mongo_db, coll_name): threading.Thread.__init__(self) self.mgo_db = mongo_db self.coll_name = coll_name self.monitor_api = 'http://cc.spdata.jianyu360.com/crawl/site_monitor/task/fetch' def get_task(self): items = {} try: response = requests.get(self.monitor_api, timeout=5) items = response.json()['data'] if '_id' in items: items['_id'] = bson.ObjectId(items['_id']) finally: return items def get_response(self, url, render=False, **kwargs): response = Response.from_dict({ "url": url, "_content": b"", "cookies": {}, "status_code": -1, "elapsed": 666, "headers": {} }) request = Request(url=url, render=render, **kwargs) for i in range(3): try: response = request.get_response() if response.status_code != 200: if any([ response.text is None, len(response.plain_text) == 0, response.tags()['tags_count'] == 0 ]): continue break except Error as e: if 'The certificate for this server is invalid.' in e.message: url = url.replace('https', 'http') request = Request(url=url, render=render, **kwargs) except requests_exceptions.SSLError: url = url.replace('https', 'http') request = Request(url=url, render=True, **kwargs) except requests_exceptions as e: logger.exception(e) break logger.debug( """ -------------- response for ---------------- thread = %s url = %s title = %s response = %s """ % ( self.getName(), url, response.title(), response ) ) if response.status_code != -1: response = Response(response) # 设置编码 response.encoding = response.encoding or "utf-8" return response def __add_items_to_db(self, task, items): result = self.mgo_db.update( coll_name=self.coll_name, condition={'_id': task['_id']}, data=items ) # print({'_id': task['_id']}) return result def deal_task(self, task): # 栏目 url = task['url'] response = self.get_response(url, render=True, proxies=False) status_code = response.status_code # 栏目页面标签 tags_count = response.tags()['tags_count'] tags_count_diff = abs(tags_count - task['tags_count']) tags_count_diff_lst = list(task['tags_count_diff_lst']) # 栏目是否改版 channel_ischange = task['channel_ischange'] if len(tags_count_diff_lst) >= 3 and not channel_ischange: mean = np.mean(tags_count_diff_lst) # 均值 std = np.std(tags_count_diff_lst, ddof=1) # 标准差 std_range = [mean - (2 * std), mean + (2 * std)] if tags_count_diff not in std_range: channel_ischange = True if len(tags_count_diff_lst) > 3 and sum(tags_count_diff_lst) == 0: channel_ischange = True status_code = 500 # 访问频次 update_dt = tools.timestamp_to_date(task['update_at'], '%Y-%m-%d') is_first_monitor = tools.get_current_date('%Y-%m-%d') != update_dt if is_first_monitor: visit_count, failure_count = 1, 0 if status_code != 200: failure_count = 1 tags_count_diff_lst = [] tags_count_diff_lst.insert(0, tags_count_diff) else: visit_count = task['visit_count'] + 1 failure_count = task['failure_count'] if status_code != 200: failure_count += 1 tags_count_diff_lst.insert(0, tags_count_diff) items = { 'title': response.title(), # 页面标头 'tags_count': tags_count, 'tags_count_diff': tags_count_diff, 'tags_count_diff_lst': tags_count_diff_lst, 'channel_ischange': channel_ischange, 'status_code': status_code, 'visit_count': visit_count, 'failure_count': failure_count, 'update_at': tools.ensure_int64(tools.get_current_timestamp()) } self.__add_items_to_db(task, items) def run(self): while True: task = self.get_task() if not task: logger.debug(f"[{self.getName()}]暂无监控任务") tools.delay_time(2) continue try: self.deal_task(task) except Exception as e: logger.exception(e) class MonitorServer(threading.Thread): def __init__(self, thread_nums=1): threading.Thread.__init__(self) self.mongo_db = MongoDB() self.coll_name = 'site_monitor' self.thread_nums = thread_nums self.parser_control_obj = MonitorParser self.parser_controls = [] def run(self): for _ in range(self.thread_nums): parser_control = self.parser_control_obj( mongo_db=self.mongo_db, coll_name=self.coll_name ) parser_control.start() self.parser_controls.append(parser_control) if __name__ == '__main__': MonitorServer(thread_nums=5).start()