# -*- coding: utf-8 -*- """ Created on 2023-05-09 --------- @summary: 原网站监控 --------- @author: Dzr """ import multiprocessing import threading from concurrent.futures import ThreadPoolExecutor, as_completed import tldextract from urllib3 import get_host import setting as settings from base_server import BaseServer, tools, mongo_table from common.log import logger from common.redis_lock import OptimisticLock def extract_domain(url): """返回完全限定的域名""" ext = tldextract.extract(url) return ext.fqdn or ext.ipv4 def extract_host(url): """ # >>> base_url = extract_host('http://192.168.3.207:8080/') """ _s, _h, _p = get_host(url) return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/" class SiteMonitorServer(BaseServer, threading.Thread): def __init__(self, site: str, redis_label: str, db: str, table: str): threading.Thread.__init__(self) super(SiteMonitorServer, self).__init__(site, redis_label, db, table, redis_cfg=settings.redis2_conf) self.label = f'{self.server}_{self.getName()}' self.sleep_interval = 3600 * 4 self.expire_time = 3600 * 24 * 15 self.output_log = True def get_monitor_lst(self, conditions=None, projection=None): results = [] table = mongo_table('editor', 'luaconfig') with table.find(conditions, projection=projection) as cursor: for items in cursor: _id = items['_id'] fingerprint = f'monitor_{self.fingerprint(_id=_id)}' results.append({'fingerprint': fingerprint, 'document': items}) yield from results def get_monitor_task(self, conditions, projection=None): results = [] with self.mgo_db.find(conditions, projection=projection) as cursor: for item in cursor: _id = item["luaconfig_id"] fingerprint = f'monitor_{self.fingerprint(_id=_id)}' results.append({'fingerprint': fingerprint, 'document': item}) yield from results def process_refresh(self, monitor, count, **kwargs): fingerprint = monitor['fingerprint'] if self.redis_db.exists(fingerprint): luaconfig_id = monitor['document']['_id'] self.mgo_db.delete_many({'luaconfig_id': luaconfig_id}) self.redis_db.delete(fingerprint) count += 1 return count def process_monitor_item(self, monitor, count, **kwargs): document = monitor['document'] _id = document['_id'] domain = extract_domain(document['href']) href = document['param_common'][11] is_subdomain = (extract_domain(href) == domain) fingerprint = monitor['fingerprint'] if not self.redis_db.exists(fingerprint): data = { "luaconfig_id": _id, "site": document["site"], "channel": document["channel"], "spidercode": document["code"], "platform": document["platform"], "state": document["state"], "domain": domain, "host": extract_host(href), "is_subdomain": is_subdomain, "url": href, "tags_count": 0, # 栏目页面标签数量 "tags_count_diff": 0, # 栏目页面标签计数差额 "tags_count_diff_lst": [], # 栏目页面标签计数差额记录表 "channel_ischange": False, # 栏目页面是否改版 "status_code": -1, # 栏目访问状态码 "visit_count": 0, # 栏目日访问次数 "failure_count": 0, # 栏目日访问失败次数 "create_at": tools.now_ts(), "update_at": tools.now_ts() } self.mgo_db.find_one_and_update( {'luaconfig_id': _id}, {'$set': data}, upsert=True ) count += 1 self.redis_db.setex(fingerprint, self.expire_time, str(_id)) return count def process_monitor_task(self, task, **kwargs): fingerprint = task['fingerprint'] if self.redis_db.exists(fingerprint): document = task['document'] task_str = tools.json_dumps(document) self.rpush(task_str) def refresh_monitor(self, thread_num=1): logger.info(f"[{self.label}]刷新监控任务") q = {'state': {'$in': [0, 4, 6, 7, 10]}} projection = {'_id': 1} monitor_lst = self.get_monitor_lst(q, projection) process_func = self.process_refresh result = self.multi_thread_worker(monitor_lst, process_func, thread_num) logger.info(f"[{self.label}]无效监控任务清除 {result} 条") def create_monitor_item(self, thread_num=1): logger.info(f"[{self.label}]扫描爬虫任务表") q = {'state': {'$nin': [0, 4, 6, 7, 10]}} projection = { 'param_common': 1, 'channel': 1, 'site': 1, 'href': 1, 'code': 1, 'platform': 1, 'state': 1 } monitor_lst = self.get_monitor_lst(q, projection) process_func = self.process_monitor_item result = self.multi_thread_worker(monitor_lst, process_func, thread_num) logger.info(f"[{self.label}]新增监控爬虫 {result} 条") def create_monitor_task(self, thread_num=1): logger.info(f"[{self.label}]创建监控任务") query = {'is_subdomain': True, 'channel_ischange': False} projection = { 'luaconfig_id': 1, 'domain': 1, 'host': 1, 'url': 1, 'tags_count': 1, # 栏目页面标签数量 'tags_count_diff': 1, # 栏目页面标签计数差额 'tags_count_diff_lst': 1, # 栏目页面标签计数差额记录表 'channel_ischange': 1, # 栏目页面是否改版 'status_code': 1, # 栏目访问状态码 'visit_count': 1, # 栏目日访问次数 'failure_count': 1, # 栏目失败访问次数 'update_at': 1 } task_lst = self.get_monitor_task(query, projection) process_func = self.process_monitor_task self.multi_thread_worker(task_lst, process_func, thread_num) logger.info(f'[{self.label}]待完成任务 {self.task_total} 条') @staticmethod def multi_thread_worker(iterable, func, workers=1): count = 0 with ThreadPoolExecutor(max_workers=workers) as executor: fs = [] for monitor in iterable: f = executor.submit(func, monitor, count=count) fs.append(f) for f in as_completed(fs): result = f.result() if result is not None and isinstance(result, int): count += result return count def run(self): logger.info(f'[{self.label}]开始生产任务') while True: try: self.refresh_monitor(10) self.create_monitor_item(20) self.create_monitor_task(12) tools.delay(self.sleep_interval) except Exception as e: logger.exception(e) class SiteMonitorClient(BaseServer): def __init__(self, site: str, redis_label: str, db: str, table: str): super(SiteMonitorClient, self).__init__(site, redis_label, db, table, redis_cfg=settings.redis2_conf) current_process = multiprocessing.current_process() self.lock_label = f'{redis_label}:{current_process.pid}' def get_crawl_task(self): with OptimisticLock(self.redis_db, self.lock_label): convert_fields = dict(luaconfig_id=tools.ObjectId, _id=tools.ObjectId) task = tools.json_loads(self.lpop(), **convert_fields) if task is not None: task = tools.document2dict(task) del task['luaconfig_id'] return task def save_data(self, table, documents): pass