Forráskód Böngészése

添加新的代理检测机制

dzr 4 hónapja
szülő
commit
c67c884001
2 módosított fájl, 45 hozzáadás és 43 törlés
  1. 7 6
      common/net.py
  2. 38 37
      services/proxy.py

+ 7 - 6
common/net.py

@@ -7,10 +7,11 @@ Created on 2025-04-03
 @author: Dzr
 """
 import socket
+from common.log import logger
 
 
 def dial_timeout(addr, port, timeout=1):
-    check_ret = False
+    state = False
 
     # 创建一个TCP/IP套接字
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -20,15 +21,15 @@ def dial_timeout(addr, port, timeout=1):
     try:
         # 尝试连接到指定的地址和端口
         s.connect((addr, port))
-        print("成功连接到 {}:{}".format(addr, port))
-        check_ret = True
+        logger.info("成功连接到 {}:{}".format(addr, port))
+        state = True
     except socket.timeout:
         # 如果连接超时,打印错误信息
-        print("连接超时")
+        logger.error("连接超时 {}:{}".format(addr, port))
     except socket.error as e:
         # 其他socket错误处理
-        print("连接错误: {}".format(e))
+        logger.error("连接错误: {}".format(e))
     finally:
         # 关闭套接字
         s.close()
-        return check_ret
+        return state

+ 38 - 37
services/proxy.py

@@ -18,8 +18,7 @@ import requests
 import setting as settings
 from base_server import BaseServer, tools
 from common.log import logger
-
-DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
+from common.net import dial_timeout
 
 
 def decrypt(input_str: str) -> str:
@@ -56,6 +55,10 @@ def get_base_url():
     return settings.jy_proxy['socks5']['base_url']
 
 
+def get_random(length=4):
+    return ''.join(random.sample(string.ascii_letters + string.digits, length))
+
+
 def get_netloc(proxy, default=None):
     proxies = None
     if isinstance(proxy, dict):
@@ -69,14 +72,20 @@ def get_netloc(proxy, default=None):
     return default
 
 
-def get_random(length=4):
-    return ''.join(random.sample(string.ascii_letters + string.digits, length))
-
-
 def get_host(proxies, scheme):
     return str(proxies['http']).replace(scheme + '://', '', 1)
 
 
+def get_addr_and_port(host):
+    """
+
+    @param str host:
+    @return:
+    """
+    addr, port = host.split(':', 1)
+    return addr, int(port)
+
+
 class BaseProxyPool(BaseServer):
 
     def __init__(self, server, redis_label, scheme):
@@ -103,7 +112,7 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
         self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
         self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
 
-    def request_proxy(self):
+    def fetch(self):
         proxy_lst = []
         for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
             try:
@@ -131,7 +140,7 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
 
         return proxy_lst
 
-    def add_proxy(self, proxy):
+    def insert(self, proxy):
         """
         添加代理IP到Redis
         """
@@ -148,7 +157,7 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
 
             logger.info(f"添加代理|{host}")
 
-    def del_proxy(self, proxy):
+    def drop(self, proxy):
         host = get_host(proxy['proxies'], self.scheme)
 
         n1 = 0
@@ -167,35 +176,27 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
         更新代理IP的信息
         """
         if not success:
-            self.del_proxy(proxy)
+            self.drop(proxy)
         else:
-            self.add_proxy(proxy)
+            self.insert(proxy)
 
-    def validate_proxy(self, proxy):
-        # url = 'https://myip.ipip.net'
-        url = 'https://www.baidu.com/'
+    def check(self, proxy):
         proxies = proxy['proxies']
-        host = str(proxies['http']).replace(self.scheme + '://', '', 1)
+        host = get_host(proxies, self.scheme)
         logger.debug(f"代理检查|{host}")
-        try:
-            request_params = {
-                "headers": {"User-Agent": DEFAULT_UA},
-                "proxies": proxies,
-                "timeout": 5
-            }
-            r = requests.get(url, **request_params)
-            return proxy, r.status_code == requests.codes.ok
-        except requests.RequestException:
-            return proxy, False
-
-    def manage_proxy(self, proxy_lst, workers):
+        addr, port = get_addr_and_port(host)
+        ret = dial_timeout(addr, port, 10)
+        return proxy, ret
+
+    def bulk_write(self, proxy_lst, workers):
         """
 
         :param list proxy_lst: 代理ip列表
         :param int workers: 工作线程数
         """
-        with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='validate') as executor:
-            fs = executor.map(self.validate_proxy, proxy_lst)
+        thread_config = dict(max_workers=workers, thread_name_prefix='check')
+        with ThreadPoolExecutor(**thread_config) as executor:
+            fs = executor.map(self.check, proxy_lst)
             for args in fs:
                 self.update_proxy(*args)
 
@@ -203,19 +204,19 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
         logger.info('创建Ip池')
         while True:
             try:
-                proxy_lst = self.request_proxy()
+                proxy_lst = self.fetch()
                 if not proxy_lst:
                     tools.delay(0.5)
                     continue
 
                 workers = min((int(len(proxy_lst) / 2) or 1), 4)  # 最大工作线程数
-                self.manage_proxy(proxy_lst, workers=workers)
+                self.bulk_write(proxy_lst, workers=workers)
                 tools.delay(self.load_interval)
             except Exception as e:
                 logger.error('Ip池创建失败')
                 logger.exception(e)
 
-    def _watch_pool(self):
+    def _watch(self):
         logger.info('Ip池代理监控')
         while True:
             try:
@@ -225,9 +226,9 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
                     continue
 
                 proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
-                invalid_proxy_lst = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst))
-                for proxy in invalid_proxy_lst:
-                    self.del_proxy(proxy)
+                proxy_pool = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst))
+                for proxy in proxy_pool:
+                    self.drop(proxy)
 
                 tools.delay(2)
             except Exception as e:
@@ -237,8 +238,8 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
         _join_lst = []
 
         _thread_lst = [
-            Thread(target=self._create_pool, daemon=True, name='CreatePool'),
-            Thread(target=self._watch_pool, daemon=True, name='WatchPool')
+            Thread(target=self._create_pool, daemon=True, name='Create'),
+            Thread(target=self._watch, daemon=True, name='Watch')
         ]
         for thread in _thread_lst:
             thread.start()