123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-05-10
- ---------
- @summary: 网站监控
- ---------
- @author: Dzr
- """
- import threading
- import bson
- import numpy as np
- import requests
- import requests.exceptions as requests_exceptions
- from playwright._impl._api_types import Error
- import utils.tools as tools
- from db.mongodb import MongoDB
- from network.request import Request
- from network.response import Response
- from utils.log import logger
- class MonitorParser(threading.Thread):
- def __init__(self, mongo_db, coll_name):
- threading.Thread.__init__(self)
- self.mgo_db = mongo_db
- self.coll_name = coll_name
- self.monitor_api = 'http://cc.spdata.jianyu360.com/crawl/site_monitor/task/fetch'
- def get_task(self):
- items = {}
- try:
- response = requests.get(self.monitor_api, timeout=5)
- items = response.json()['data']
- if '_id' in items:
- items['_id'] = bson.ObjectId(items['_id'])
- finally:
- return items
- def get_response(self, url, render=False, **kwargs):
- response = Response.from_dict({
- "url": url,
- "_content": b"",
- "cookies": {},
- "status_code": -1,
- "elapsed": 666,
- "headers": {}
- })
- request = Request(url=url, render=render, **kwargs)
- for i in range(3):
- try:
- response = request.get_response()
- if response.status_code != 200:
- if any([
- response.text is None,
- len(response.plain_text) == 0,
- response.tags()['tags_count'] == 0
- ]):
- continue
- break
- except Error as e:
- if 'The certificate for this server is invalid.' in e.message:
- url = url.replace('https', 'http')
- request = Request(url=url, render=render, **kwargs)
- except requests_exceptions.SSLError:
- url = url.replace('https', 'http')
- request = Request(url=url, render=True, **kwargs)
- except requests_exceptions as e:
- logger.exception(e)
- break
- logger.debug(
- """
- -------------- response for ----------------
- thread = %s
- url = %s
- title = %s
- response = %s
- """
- % (
- self.getName(),
- url,
- response.title(),
- response
- )
- )
- if response.status_code != -1:
- response = Response(response)
- # 设置编码
- response.encoding = response.encoding or "utf-8"
- return response
- def __add_items_to_db(self, task, items):
- result = self.mgo_db.update(
- coll_name=self.coll_name,
- condition={'_id': task['_id']},
- data=items
- )
- # print({'_id': task['_id']})
- return result
- def deal_task(self, task):
- # 栏目
- url = task['url']
- response = self.get_response(url, render=True, proxies=False)
- status_code = response.status_code
- # 栏目页面标签
- tags_count = response.tags()['tags_count']
- tags_count_diff = abs(tags_count - task['tags_count'])
- tags_count_diff_lst = list(task['tags_count_diff_lst'])
- # 栏目是否改版
- channel_ischange = task['channel_ischange']
- if len(tags_count_diff_lst) >= 3 and not channel_ischange:
- mean = np.mean(tags_count_diff_lst) # 均值
- std = np.std(tags_count_diff_lst, ddof=1) # 标准差
- std_range = [mean - (2 * std), mean + (2 * std)]
- if tags_count_diff not in std_range:
- channel_ischange = True
- if len(tags_count_diff_lst) > 3 and sum(tags_count_diff_lst) == 0:
- channel_ischange = True
- status_code = 500
- # 访问频次
- update_dt = tools.timestamp_to_date(task['update_at'], '%Y-%m-%d')
- is_first_monitor = tools.get_current_date('%Y-%m-%d') != update_dt
- if is_first_monitor:
- visit_count, failure_count = 1, 0
- if status_code != 200:
- failure_count = 1
- tags_count_diff_lst = []
- tags_count_diff_lst.insert(0, tags_count_diff)
- else:
- visit_count = task['visit_count'] + 1
- failure_count = task['failure_count']
- if status_code != 200:
- failure_count += 1
- tags_count_diff_lst.insert(0, tags_count_diff)
- items = {
- 'title': response.title(), # 页面标头
- 'tags_count': tags_count,
- 'tags_count_diff': tags_count_diff,
- 'tags_count_diff_lst': tags_count_diff_lst,
- 'channel_ischange': channel_ischange,
- 'status_code': status_code,
- 'visit_count': visit_count,
- 'failure_count': failure_count,
- 'update_at': tools.ensure_int64(tools.get_current_timestamp())
- }
- self.__add_items_to_db(task, items)
- def run(self):
- while True:
- task = self.get_task()
- if not task:
- logger.debug(f"[{self.getName()}]暂无监控任务")
- tools.delay_time(2)
- continue
- try:
- self.deal_task(task)
- except Exception as e:
- logger.exception(e)
- class MonitorServer(threading.Thread):
- def __init__(self, thread_nums=1):
- threading.Thread.__init__(self)
- self.mongo_db = MongoDB()
- self.coll_name = 'site_monitor'
- self.thread_nums = thread_nums
- self.parser_control_obj = MonitorParser
- self.parser_controls = []
- def run(self):
- for _ in range(self.thread_nums):
- parser_control = self.parser_control_obj(
- mongo_db=self.mongo_db,
- coll_name=self.coll_name
- )
- parser_control.start()
- self.parser_controls.append(parser_control)
- if __name__ == '__main__':
- MonitorServer(thread_nums=5).start()
|