site_monitor.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-09
  4. ---------
  5. @summary: 原网站监控
  6. ---------
  7. @author: Dzr
  8. """
  9. import multiprocessing
  10. import threading
  11. from concurrent.futures import ThreadPoolExecutor, as_completed
  12. import tldextract
  13. from urllib3 import get_host
  14. import setting as settings
  15. from base_server import BaseServer, tools, mongo_table
  16. from common.log import logger
  17. from common.redis_lock import OptimisticLock
  18. def extract_domain(url):
  19. """返回完全限定的域名"""
  20. ext = tldextract.extract(url)
  21. return ext.fqdn or ext.ipv4
  22. def extract_host(url):
  23. """
  24. # >>> base_url = extract_host('http://192.168.3.207:8080/')
  25. """
  26. _s, _h, _p = get_host(url)
  27. return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
  28. class SiteMonitorServer(BaseServer, threading.Thread):
  29. def __init__(self, site: str, redis_label: str, db: str, table: str):
  30. threading.Thread.__init__(self)
  31. super(SiteMonitorServer, self).__init__(site, redis_label, db, table,
  32. redis_cfg=settings.redis2_conf)
  33. self.label = f'{self.server}_{self.getName()}'
  34. self.sleep_interval = 3600 * 4
  35. self.expire_time = 3600 * 24 * 15
  36. self.output_log = True
  37. def get_monitor_lst(self, conditions=None, projection=None):
  38. results = []
  39. table = mongo_table('editor', 'luaconfig')
  40. with table.find(conditions, projection=projection) as cursor:
  41. for items in cursor:
  42. _id = items['_id']
  43. fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
  44. results.append({'fingerprint': fingerprint, 'document': items})
  45. yield from results
  46. def get_monitor_task(self, conditions, projection=None):
  47. results = []
  48. with self.mgo_db.find(conditions, projection=projection) as cursor:
  49. for item in cursor:
  50. _id = item["luaconfig_id"]
  51. fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
  52. results.append({'fingerprint': fingerprint, 'document': item})
  53. yield from results
  54. def process_refresh(self, monitor, count, **kwargs):
  55. fingerprint = monitor['fingerprint']
  56. if self.redis_db.exists(fingerprint):
  57. luaconfig_id = monitor['document']['_id']
  58. self.mgo_db.delete_many({'luaconfig_id': luaconfig_id})
  59. self.redis_db.delete(fingerprint)
  60. count += 1
  61. return count
  62. def process_monitor_item(self, monitor, count, **kwargs):
  63. document = monitor['document']
  64. _id = document['_id']
  65. domain = extract_domain(document['href'])
  66. href = document['param_common'][11]
  67. is_subdomain = (extract_domain(href) == domain)
  68. fingerprint = monitor['fingerprint']
  69. if not self.redis_db.exists(fingerprint):
  70. data = {
  71. "luaconfig_id": _id,
  72. "site": document["site"],
  73. "channel": document["channel"],
  74. "spidercode": document["code"],
  75. "platform": document["platform"],
  76. "state": document["state"],
  77. "domain": domain,
  78. "host": extract_host(href),
  79. "is_subdomain": is_subdomain,
  80. "url": href,
  81. "tags_count": 0, # 栏目页面标签数量
  82. "tags_count_diff": 0, # 栏目页面标签计数差额
  83. "tags_count_diff_lst": [], # 栏目页面标签计数差额记录表
  84. "channel_ischange": False, # 栏目页面是否改版
  85. "status_code": -1, # 栏目访问状态码
  86. "visit_count": 0, # 栏目日访问次数
  87. "failure_count": 0, # 栏目日访问失败次数
  88. "create_at": tools.now_ts(),
  89. "update_at": tools.now_ts()
  90. }
  91. self.mgo_db.find_one_and_update(
  92. {'luaconfig_id': _id},
  93. {'$set': data},
  94. upsert=True
  95. )
  96. count += 1
  97. self.redis_db.setex(fingerprint, self.expire_time, str(_id))
  98. return count
  99. def process_monitor_task(self, task, **kwargs):
  100. fingerprint = task['fingerprint']
  101. if self.redis_db.exists(fingerprint):
  102. document = task['document']
  103. task_str = tools.json_dumps(document)
  104. self.rpush(task_str)
  105. def refresh_monitor(self, thread_num=1):
  106. logger.info(f"[{self.label}]刷新监控任务")
  107. q = {'state': {'$in': [0, 4, 6, 7, 10]}}
  108. projection = {'_id': 1}
  109. monitor_lst = self.get_monitor_lst(q, projection)
  110. process_func = self.process_refresh
  111. result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
  112. logger.info(f"[{self.label}]无效监控任务清除 {result} 条")
  113. def create_monitor_item(self, thread_num=1):
  114. logger.info(f"[{self.label}]扫描爬虫任务表")
  115. q = {'state': {'$nin': [0, 4, 6, 7, 10]}}
  116. projection = {
  117. 'param_common': 1,
  118. 'channel': 1,
  119. 'site': 1,
  120. 'href': 1,
  121. 'code': 1,
  122. 'platform': 1,
  123. 'state': 1
  124. }
  125. monitor_lst = self.get_monitor_lst(q, projection)
  126. process_func = self.process_monitor_item
  127. result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
  128. logger.info(f"[{self.label}]新增监控爬虫 {result} 条")
  129. def create_monitor_task(self, thread_num=1):
  130. logger.info(f"[{self.label}]创建监控任务")
  131. query = {'is_subdomain': True, 'channel_ischange': False}
  132. projection = {
  133. 'luaconfig_id': 1,
  134. 'domain': 1,
  135. 'host': 1,
  136. 'url': 1,
  137. 'tags_count': 1, # 栏目页面标签数量
  138. 'tags_count_diff': 1, # 栏目页面标签计数差额
  139. 'tags_count_diff_lst': 1, # 栏目页面标签计数差额记录表
  140. 'channel_ischange': 1, # 栏目页面是否改版
  141. 'status_code': 1, # 栏目访问状态码
  142. 'visit_count': 1, # 栏目日访问次数
  143. 'failure_count': 1, # 栏目失败访问次数
  144. 'update_at': 1
  145. }
  146. task_lst = self.get_monitor_task(query, projection)
  147. process_func = self.process_monitor_task
  148. self.multi_thread_worker(task_lst, process_func, thread_num)
  149. logger.info(f'[{self.label}]待完成任务 {self.task_total} 条')
  150. @staticmethod
  151. def multi_thread_worker(iterable, func, workers=1):
  152. count = 0
  153. with ThreadPoolExecutor(max_workers=workers) as executor:
  154. fs = []
  155. for monitor in iterable:
  156. f = executor.submit(func, monitor, count=count)
  157. fs.append(f)
  158. for f in as_completed(fs):
  159. result = f.result()
  160. if result is not None and isinstance(result, int):
  161. count += result
  162. return count
  163. def run(self):
  164. logger.info(f'[{self.label}]开始生产任务')
  165. while True:
  166. try:
  167. self.refresh_monitor(10)
  168. self.create_monitor_item(20)
  169. self.create_monitor_task(12)
  170. tools.delay(self.sleep_interval)
  171. except Exception as e:
  172. logger.exception(e)
  173. class SiteMonitorClient(BaseServer):
  174. def __init__(self, site: str, redis_label: str, db: str, table: str):
  175. super(SiteMonitorClient, self).__init__(site, redis_label, db, table,
  176. redis_cfg=settings.redis2_conf)
  177. current_process = multiprocessing.current_process()
  178. self.lock_label = f'{redis_label}:{current_process.pid}'
  179. def get_crawl_task(self):
  180. with OptimisticLock(self.redis_db, self.lock_label):
  181. convert_fields = dict(luaconfig_id=tools.ObjectId, _id=tools.ObjectId)
  182. task = tools.json_loads(self.lpop(), **convert_fields)
  183. if task is not None:
  184. task = tools.document2dict(task)
  185. del task['luaconfig_id']
  186. return task
  187. def save_data(self, table, documents):
  188. pass