123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-05-09
- ---------
- @summary: 原网站监控
- ---------
- @author: Dzr
- """
- import multiprocessing
- import threading
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import tldextract
- from urllib3 import get_host
- import setting as settings
- from base_server import BaseServer, tools, mongo_table
- from common.log import logger
- from common.redis_lock import OptimisticLock
- def extract_domain(url):
- """返回完全限定的域名"""
- ext = tldextract.extract(url)
- return ext.fqdn or ext.ipv4
- def extract_host(url):
- """
- # >>> base_url = extract_host('http://192.168.3.207:8080/')
- """
- _s, _h, _p = get_host(url)
- return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
- class SiteMonitorServer(BaseServer, threading.Thread):
- def __init__(self, site: str, redis_label: str, db: str, table: str):
- threading.Thread.__init__(self)
- super(SiteMonitorServer, self).__init__(site, redis_label, db, table,
- redis_cfg=settings.redis2_conf)
- self.label = f'{self.server}_{self.getName()}'
- self.sleep_interval = 3600 * 4
- self.expire_time = 3600 * 24 * 15
- self.output_log = True
- def get_monitor_lst(self, conditions=None, projection=None):
- results = []
- table = mongo_table('editor', 'luaconfig')
- with table.find(conditions, projection=projection) as cursor:
- for items in cursor:
- _id = items['_id']
- fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
- results.append({'fingerprint': fingerprint, 'document': items})
- yield from results
- def get_monitor_task(self, conditions, projection=None):
- results = []
- with self.mgo_db.find(conditions, projection=projection) as cursor:
- for item in cursor:
- _id = item["luaconfig_id"]
- fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
- results.append({'fingerprint': fingerprint, 'document': item})
- yield from results
- def process_refresh(self, monitor, count, **kwargs):
- fingerprint = monitor['fingerprint']
- if self.redis_db.exists(fingerprint):
- luaconfig_id = monitor['document']['_id']
- self.mgo_db.delete_many({'luaconfig_id': luaconfig_id})
- self.redis_db.delete(fingerprint)
- count += 1
- return count
- def process_monitor_item(self, monitor, count, **kwargs):
- document = monitor['document']
- _id = document['_id']
- domain = extract_domain(document['href'])
- href = document['param_common'][11]
- is_subdomain = (extract_domain(href) == domain)
- fingerprint = monitor['fingerprint']
- if not self.redis_db.exists(fingerprint):
- data = {
- "luaconfig_id": _id,
- "site": document["site"],
- "channel": document["channel"],
- "spidercode": document["code"],
- "platform": document["platform"],
- "state": document["state"],
- "domain": domain,
- "host": extract_host(href),
- "is_subdomain": is_subdomain,
- "url": href,
- "tags_count": 0, # 栏目页面标签数量
- "tags_count_diff": 0, # 栏目页面标签计数差额
- "tags_count_diff_lst": [], # 栏目页面标签计数差额记录表
- "channel_ischange": False, # 栏目页面是否改版
- "status_code": -1, # 栏目访问状态码
- "visit_count": 0, # 栏目日访问次数
- "failure_count": 0, # 栏目日访问失败次数
- "create_at": tools.now_ts(),
- "update_at": tools.now_ts()
- }
- self.mgo_db.find_one_and_update(
- {'luaconfig_id': _id},
- {'$set': data},
- upsert=True
- )
- count += 1
- self.redis_db.setex(fingerprint, self.expire_time, str(_id))
- return count
- def process_monitor_task(self, task, **kwargs):
- fingerprint = task['fingerprint']
- if self.redis_db.exists(fingerprint):
- document = task['document']
- task_str = tools.json_dumps(document)
- self.rpush(task_str)
- def refresh_monitor(self, thread_num=1):
- logger.info(f"[{self.label}]刷新监控任务")
- q = {'state': {'$in': [0, 4, 6, 7, 10]}}
- projection = {'_id': 1}
- monitor_lst = self.get_monitor_lst(q, projection)
- process_func = self.process_refresh
- result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
- logger.info(f"[{self.label}]无效监控任务清除 {result} 条")
- def create_monitor_item(self, thread_num=1):
- logger.info(f"[{self.label}]扫描爬虫任务表")
- q = {'state': {'$nin': [0, 4, 6, 7, 10]}}
- projection = {
- 'param_common': 1,
- 'channel': 1,
- 'site': 1,
- 'href': 1,
- 'code': 1,
- 'platform': 1,
- 'state': 1
- }
- monitor_lst = self.get_monitor_lst(q, projection)
- process_func = self.process_monitor_item
- result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
- logger.info(f"[{self.label}]新增监控爬虫 {result} 条")
- def create_monitor_task(self, thread_num=1):
- logger.info(f"[{self.label}]创建监控任务")
- query = {'is_subdomain': True, 'channel_ischange': False}
- projection = {
- 'luaconfig_id': 1,
- 'domain': 1,
- 'host': 1,
- 'url': 1,
- 'tags_count': 1, # 栏目页面标签数量
- 'tags_count_diff': 1, # 栏目页面标签计数差额
- 'tags_count_diff_lst': 1, # 栏目页面标签计数差额记录表
- 'channel_ischange': 1, # 栏目页面是否改版
- 'status_code': 1, # 栏目访问状态码
- 'visit_count': 1, # 栏目日访问次数
- 'failure_count': 1, # 栏目失败访问次数
- 'update_at': 1
- }
- task_lst = self.get_monitor_task(query, projection)
- process_func = self.process_monitor_task
- self.multi_thread_worker(task_lst, process_func, thread_num)
- logger.info(f'[{self.label}]待完成任务 {self.task_total} 条')
- @staticmethod
- def multi_thread_worker(iterable, func, workers=1):
- count = 0
- with ThreadPoolExecutor(max_workers=workers) as executor:
- fs = []
- for monitor in iterable:
- f = executor.submit(func, monitor, count=count)
- fs.append(f)
- for f in as_completed(fs):
- result = f.result()
- if result is not None and isinstance(result, int):
- count += result
- return count
- def run(self):
- logger.info(f'[{self.label}]开始生产任务')
- while True:
- try:
- self.refresh_monitor(10)
- self.create_monitor_item(20)
- self.create_monitor_task(12)
- tools.delay(self.sleep_interval)
- except Exception as e:
- logger.exception(e)
- class SiteMonitorClient(BaseServer):
- def __init__(self, site: str, redis_label: str, db: str, table: str):
- super(SiteMonitorClient, self).__init__(site, redis_label, db, table,
- redis_cfg=settings.redis2_conf)
- current_process = multiprocessing.current_process()
- self.lock_label = f'{redis_label}:{current_process.pid}'
- def get_crawl_task(self):
- with OptimisticLock(self.redis_db, self.lock_label):
- convert_fields = dict(luaconfig_id=tools.ObjectId, _id=tools.ObjectId)
- task = tools.json_loads(self.lpop(), **convert_fields)
- if task is not None:
- task = tools.document2dict(task)
- del task['luaconfig_id']
- return task
- def save_data(self, table, documents):
- pass
|