dzr 8 місяців тому
батько
коміт
79eab6e34a
4 змінених файлів з 1 додано та 216 видалено
  1. 0 0
      conf/dev.yaml
  2. 0 0
      conf/test.yaml
  3. 0 215
      services/site_monitor.py
  4. 1 1
      setting.py

+ 0 - 0
yaml/dev.yaml → conf/dev.yaml


+ 0 - 0
yaml/test.yaml → conf/test.yaml


+ 0 - 215
services/site_monitor.py

@@ -1,215 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2023-05-09
----------
-@summary:  原网站监控
----------
-@author: Dzr
-"""
-import multiprocessing
-import threading
-from concurrent.futures import ThreadPoolExecutor, as_completed
-
-import tldextract
-from urllib3 import get_host
-
-import setting as settings
-from base_server import BaseServer, tools, mongo_table
-from common.log import logger
-from common.redis_lock import OptimisticLock
-
-
-def extract_domain(url):
-    """返回完全限定的域名"""
-    ext = tldextract.extract(url)
-    return ext.fqdn or ext.ipv4
-
-
-def extract_host(url):
-    """
-
-    # >>> base_url = extract_host('http://192.168.3.207:8080/')
-    """
-    _s, _h, _p = get_host(url)
-    return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
-
-
-class SiteMonitorServer(BaseServer, threading.Thread):
-
-    def __init__(self, site: str, redis_label: str, db: str, table: str):
-        threading.Thread.__init__(self)
-        super(SiteMonitorServer, self).__init__(site, redis_label, db, table,
-                                                redis_cfg=settings.redis2_conf)
-        self.label = f'{self.server}_{self.getName()}'
-
-        self.sleep_interval = 3600 * 4
-        self.expire_time = 3600 * 24 * 15
-        self.output_log = True
-
-    def get_monitor_lst(self, conditions=None, projection=None):
-        results = []
-        table = mongo_table('editor', 'luaconfig')
-        with table.find(conditions, projection=projection) as cursor:
-            for items in cursor:
-                _id = items['_id']
-                fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
-                results.append({'fingerprint': fingerprint, 'document': items})
-        yield from results
-
-    def get_monitor_task(self, conditions, projection=None):
-        results = []
-        with self.mgo_db.find(conditions, projection=projection) as cursor:
-            for item in cursor:
-                _id = item["luaconfig_id"]
-                fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
-                results.append({'fingerprint': fingerprint, 'document': item})
-        yield from results
-
-    def process_refresh(self, monitor, count, **kwargs):
-        fingerprint = monitor['fingerprint']
-        if self.redis_db.exists(fingerprint):
-            luaconfig_id = monitor['document']['_id']
-            self.mgo_db.delete_many({'luaconfig_id': luaconfig_id})
-            self.redis_db.delete(fingerprint)
-            count += 1
-        return count
-
-    def process_monitor_item(self, monitor, count, **kwargs):
-        document = monitor['document']
-        _id = document['_id']
-        domain = extract_domain(document['href'])
-        href = document['param_common'][11]
-        is_subdomain = (extract_domain(href) == domain)
-        fingerprint = monitor['fingerprint']
-        if not self.redis_db.exists(fingerprint):
-            data = {
-                "luaconfig_id": _id,
-                "site": document["site"],
-                "channel": document["channel"],
-                "spidercode": document["code"],
-                "platform": document["platform"],
-                "state": document["state"],
-                "domain": domain,
-                "host": extract_host(href),
-                "is_subdomain": is_subdomain,
-                "url": href,
-                "tags_count": 0,  # 栏目页面标签数量
-                "tags_count_diff": 0,  # 栏目页面标签计数差额
-                "tags_count_diff_lst": [],  # 栏目页面标签计数差额记录表
-                "channel_ischange": False,  # 栏目页面是否改版
-                "status_code": -1,  # 栏目访问状态码
-                "visit_count": 0,  # 栏目日访问次数
-                "failure_count": 0,  # 栏目日访问失败次数
-                "create_at": tools.now_ts(),
-                "update_at": tools.now_ts()
-            }
-            self.mgo_db.find_one_and_update(
-                {'luaconfig_id': _id},
-                {'$set': data},
-                upsert=True
-            )
-            count += 1
-        self.redis_db.setex(fingerprint, self.expire_time, str(_id))
-        return count
-
-    def process_monitor_task(self, task, **kwargs):
-        fingerprint = task['fingerprint']
-        if self.redis_db.exists(fingerprint):
-            document = task['document']
-            task_str = tools.json_dumps(document)
-            self.rpush(task_str)
-
-    def refresh_monitor(self, thread_num=1):
-        logger.info(f"[{self.label}]刷新监控任务")
-        q = {'state': {'$in': [0, 4, 6, 7, 10]}}
-        projection = {'_id': 1}
-        monitor_lst = self.get_monitor_lst(q, projection)
-        process_func = self.process_refresh
-        result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
-        logger.info(f"[{self.label}]无效监控任务清除 {result} 条")
-
-    def create_monitor_item(self, thread_num=1):
-        logger.info(f"[{self.label}]扫描爬虫任务表")
-        q = {'state': {'$nin': [0, 4, 6, 7, 10]}}
-        projection = {
-            'param_common': 1,
-            'channel': 1,
-            'site': 1,
-            'href': 1,
-            'code': 1,
-            'platform': 1,
-            'state': 1
-        }
-        monitor_lst = self.get_monitor_lst(q, projection)
-        process_func = self.process_monitor_item
-        result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
-        logger.info(f"[{self.label}]新增监控爬虫 {result} 条")
-
-    def create_monitor_task(self, thread_num=1):
-        logger.info(f"[{self.label}]创建监控任务")
-        query = {'is_subdomain': True, 'channel_ischange': False}
-        projection = {
-            'luaconfig_id': 1,
-            'domain': 1,
-            'host': 1,
-            'url': 1,
-            'tags_count': 1,  # 栏目页面标签数量
-            'tags_count_diff': 1,  # 栏目页面标签计数差额
-            'tags_count_diff_lst': 1,  # 栏目页面标签计数差额记录表
-            'channel_ischange': 1,  # 栏目页面是否改版
-            'status_code': 1,  # 栏目访问状态码
-            'visit_count': 1,  # 栏目日访问次数
-            'failure_count': 1,  # 栏目失败访问次数
-            'update_at': 1
-        }
-        task_lst = self.get_monitor_task(query, projection)
-        process_func = self.process_monitor_task
-        self.multi_thread_worker(task_lst, process_func, thread_num)
-        logger.info(f'[{self.label}]待完成任务 {self.task_total} 条')
-
-    @staticmethod
-    def multi_thread_worker(iterable, func, workers=1):
-        count = 0
-        with ThreadPoolExecutor(max_workers=workers) as executor:
-            fs = []
-            for monitor in iterable:
-                f = executor.submit(func, monitor, count=count)
-                fs.append(f)
-
-            for f in as_completed(fs):
-                result = f.result()
-                if result is not None and isinstance(result, int):
-                    count += result
-        return count
-
-    def run(self):
-        logger.info(f'[{self.label}]开始生产任务')
-        while True:
-            try:
-                self.refresh_monitor(10)
-                self.create_monitor_item(20)
-                self.create_monitor_task(12)
-                tools.delay(self.sleep_interval)
-            except Exception as e:
-                logger.exception(e)
-
-
-class SiteMonitorClient(BaseServer):
-
-    def __init__(self, site: str, redis_label: str, db: str, table: str):
-        super(SiteMonitorClient, self).__init__(site, redis_label, db, table,
-                                                redis_cfg=settings.redis2_conf)
-        current_process = multiprocessing.current_process()
-        self.lock_label = f'{redis_label}:{current_process.pid}'
-
-    def get_crawl_task(self):
-        with OptimisticLock(self.redis_db, self.lock_label):
-            convert_fields = dict(luaconfig_id=tools.ObjectId, _id=tools.ObjectId)
-            task = tools.json_loads(self.lpop(), **convert_fields)
-            if task is not None:
-                task = tools.document2dict(task)
-                del task['luaconfig_id']
-            return task
-
-    def save_data(self, table, documents):
-        pass

+ 1 - 1
setting.py

@@ -23,7 +23,7 @@ else:
     ENV = 'test.yaml'
 
 _base_path = Path(__file__).parent
-_yaml_conf = (_base_path / 'yaml' / ENV).resolve()
+_yaml_conf = (_base_path / 'conf' / ENV).resolve()
 
 with open(_yaml_conf, encoding="utf-8") as f:
     _conf = yaml.safe_load(f)