Ver código fonte

fix:重构代理池

dzr 8 meses atrás
pai
commit
72ff4c329c
5 arquivos alterados com 249 adições e 216 exclusões
  1. 22 16
      docker-compose.yml
  2. 14 12
      gunicorn.conf.py
  3. 19 0
      pytools_main.py
  4. 1 6
      routes.py
  5. 193 182
      services/proxy.py

+ 22 - 16
docker-compose.yml

@@ -1,42 +1,48 @@
 version: "3"
 services:
-  master-server:
+  producer:
     container_name: pytools-main
-    image: pytools:latest
+    image: 172.17.189.142:8081/pyspider/pytools:latest
     volumes: # 映射文件夹
       - /mnt/pytools:/mnt
     restart: always
     privileged: true
-#    shm_size: 4GB
     logging:
       driver: "json-file"
       options:
-        max-size: "200k"
-        max-file: "10"
+        max-size: "1G"
+        max-file: "1"
     deploy:
       resources:
         reservations:
-          memory: 200M
+          memory: 10M
+    networks:
+      - producer_network
     command: 'python3 produce_task.py'
 
-  slave-server:
+  consumer:
     container_name: pytools-server
-    image: pytools:latest
+    image: 172.17.189.142:8081/pyspider/pytools:latest
     volumes: # 映射文件夹
       - /mnt/pytools:/mnt
-#    network_mode: "host"
-    ports:
-      - "1405:1405"
+    network_mode: "host"
     restart: always
     privileged: true
-#    shm_size: 10GB
     logging:
       driver: "json-file"
       options:
-        max-size: "200k"
-        max-file: "10"
+        max-size: "1G"
+        max-file: "1"
     deploy:
       resources:
         reservations:
-          memory: 200M
-    command: 'python3 -m gunicorn -c gunicorn.conf.py build_tools:app'
+          memory: 10M
+    command: 'python3 -m gunicorn -c gunicorn.conf.py pytools_main:app'
+
+networks:
+  producer_network:
+    driver: bridge
+    ipam:
+      driver: default
+      config:
+        - subnet: 172.21.0.0/16

+ 14 - 12
gunicorn.conf.py

@@ -6,24 +6,26 @@ Created on 2023-04-24
 ---------
 @author: Dzr
 """
-import multiprocessing
-
 # 服务地址
 bind = '0.0.0.0:1405'
 # 代码更改时重新启动工作程序(适用于开发测试)
 reload = False
-# 日志输出级别
-loglevel = 'info'
+# 转发白名单
+forwarded_allow_ips = '*'
 # 访问记录到标准输出
 accesslog = '-'
+errorlog = '-'
+# 日志输出级别
+loglevel = 'info'
 # 访问记录格式
 access_log_format = '%({x-forwarded-for}i)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
-
-# 启动工作进程数量
-workers = multiprocessing.cpu_count() * 2 + 1
 # 工作模式
-worker_class = 'gevent'
-# 启动工作线程数量(当worker指定为gevent或者evenlet类型时,线程变成基于Greentlet的task(伪线程),这时候线程数量threads参数是无效的)
-# threads = multiprocessing.cpu_count() * 2
-# 转发白名单
-forwarded_allow_ips = '*'
+worker_class = 'gunicorn.workers.ggevent.GeventWorker'
+# 工作进程数量
+workers = 4
+# 工作进程最大连接处理数量
+worker_connections = 2000
+# 工作进程重启最大请求数量
+max_requests = 50000
+# 工作进程重启抖动时间
+max_requests_jitter = 2

+ 19 - 0
pytools_main.py

@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-11-28 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+import logging
+
+from routes import app
+
+gunicorn_logger = logging.getLogger('gunicorn.error')
+app.logger.handlers = gunicorn_logger.handlers
+app.logger.setLevel(gunicorn_logger.level)
+
+
+if __name__ == '__main__':
+    app.run(host='0.0.0.0', port=1405, debug=True, use_reloader=False)

+ 1 - 6
build_tools.py → routes.py

@@ -99,7 +99,6 @@ def get_proxy(scheme):
     pk = request.args.get('pk')  # 代理类型:1=部署lua下载器节点类 2=python专用代理
 
     result = {}
-    proxies = None
     try:
         if scheme == 'http':
             proxies = httpProxyPool.proxies(pk=pk)
@@ -108,7 +107,7 @@ def get_proxy(scheme):
         else:
             abort(404)
 
-        logger.info(f'[调用{scheme}代理]{proxies}')
+        # logger.debug(f'[调用{scheme}代理]{proxies}')
         if proxies is not None:
             result.update(proxies)
     except (KeyError, IndexError):
@@ -193,7 +192,3 @@ def competing_goods_account_release():
         abort(404)  # Unauthorized 未授权
     res = accountManagePool.release_account(uid, crawl_type, req_ip)
     return jsonify(data=res)
-
-
-# if __name__ == '__main__':
-#     app.run(host='0.0.0.0', port=1405, debug=True, use_reloader=False)

+ 193 - 182
services/proxy.py

@@ -6,13 +6,11 @@ Created on 2023-05-11
 ---------
 @author: Dzr
 """
-import ast
-import multiprocessing
 import random
+import string
 import threading
-from collections import deque
-from concurrent.futures import ThreadPoolExecutor, as_completed
-from operator import itemgetter
+from concurrent.futures import ThreadPoolExecutor
+from threading import Thread
 from urllib.parse import urlparse
 
 import requests
@@ -20,7 +18,6 @@ import requests
 import setting as settings
 from base_server import BaseServer, tools
 from common.log import logger
-from common.redis_lock import OptimisticLock
 
 DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
 
@@ -72,245 +69,259 @@ def get_netloc(proxy, default=None):
     return default
 
 
-class BaseProxyPool(BaseServer):
+def get_random(length=4):
+    return ''.join(random.sample(string.ascii_letters + string.digits, length))
 
-    def __init__(self, name, redis_label, scheme):
-        super(BaseProxyPool, self).__init__(server=name, label=redis_label)
-        self.scheme = scheme.lower()
-        self.proxy_name = self.scheme + self.server
-        self.proxy_queue = f'{redis_label}_{self.scheme}'
-        self.unique_key = ('ip', 'port')  # 组合 proxy 指纹的字段名称
 
-    def get_redis_name(self, proxy):
-        return f"{self.proxy_queue}_{proxy['fingerprint']}"
+def get_host(proxies, scheme):
+    return str(proxies['http']).replace(scheme + '://', '', 1)
 
-    def str_scan(self, pattern, count=1000):
-        cursor = '0'
-        while True:
-            cursor, keys = self.redis_db.scan(cursor, pattern, count)
-            if len(keys) > 0:
-                yield from keys
 
-            if cursor == 0:
-                break
-
-    def get_redis_name_lst(self, pattern='*'):
-        results = []
-        pattern = self.proxy_queue + pattern
-        for key in self.str_scan(pattern, count=5000):
-            results.append(key)
-        return results
-
-    def get_proxy(self, name):
-        items = self.redis_db.hgetall(name)
-        if items is None or 'proxies' not in items:
-            return None
-
-        proxy = {
-            'proxies': ast.literal_eval(items['proxies']),
-            'fingerprint': items['fingerprint'],
-            'start_time': int(items['start_time']),
-            'end_time': int(items['end_time']),
-            'last_time': int(items['last_time']),
-            'usage': int(items['usage']),
-            'pk': int(items.get('pk', 1))
-        }
-        return proxy
-
-    def get(self, name, key):
-        return self.redis_db.hget(name, key)
-
-    def exists(self, proxy):
-        return self.redis_db.exists(self.get_redis_name(proxy))
-
-    def check(self, proxy):
-        is_ok = False
-        # url = 'https://myip.ipip.net'
-        url = 'https://www.baidu.com/'
-        netloc = get_netloc(proxy)
-        try:
-            requests_param = {
-                "headers": {'User-Agent': DEFAULT_UA},
-                "proxies": proxy['proxies'],
-                "timeout": 5
-            }
-            requests.get(url, **requests_param)
-            is_ok = True
-        except requests.RequestException:
-            pass
-
-        msg = "正常" if is_ok else "失效"
-        logger.debug(f"[{self.proxy_name}]检查代理Ip - {netloc} --通信{msg}")
-        return proxy, is_ok
-
-    def remove_proxy(self, proxy):
-        netloc = get_netloc(proxy)
-        logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --删除")
-        if self.exists(proxy):
-            self.redis_db.delete(self.get_redis_name(proxy))
+class BaseProxyPool(BaseServer):
 
-    def add_proxy(self, proxy):
-        netloc = get_netloc(proxy)
-        logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --添加")
-        if not self.exists(proxy):
-            redis_name = self.get_redis_name(proxy)
-            self.redis_db.hset(redis_name, None, None, mapping=proxy)
-            expire_ts = proxy['end_time'] - tools.now_ts()
-            self.redis_db.expire(redis_name, expire_ts)
+    def __init__(self, server, redis_label, scheme):
+        super(BaseProxyPool, self).__init__(server=server, label=redis_label)
+        self.scheme = scheme.lower()
 
 
 class ProxyPoolServer(BaseProxyPool, threading.Thread):
 
-    def __init__(self, name, redis_label, scheme):
+    def __init__(self, service_name, redis_label, scheme):
         """
         代理池生产管理
 
-        @param str name: 服务名称
+        @param str service_name: 服务名称
         @param str redis_label: redis 标识前缀
         @param str scheme: 协议类型
         """
         threading.Thread.__init__(self)
-        super(ProxyPoolServer, self).__init__(name, redis_label, scheme)
-        self.label = f'{self.proxy_name}_{self.getName()}'
+        super(ProxyPoolServer, self).__init__(service_name, redis_label, scheme)
         self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
-        self.load_interval = 60  # 轮询访问vps代理服务的时间间隔
-
-    def remove_failure_proxy(self, proxy_lst):
-        """删除失效/故障代理ip"""
-        logger.info(f"[{self.label}]清除无效代理Ip")
-        proxy_fingerprints = set([proxy['fingerprint'] for proxy in proxy_lst])
-        for redis_name in self.get_redis_name_lst():
-            fingerprint = self.get(redis_name, 'fingerprint')
-            if fingerprint not in proxy_fingerprints:
-                self.redis_db.delete(redis_name)
+        self.load_interval = 60  # 访问vps服务的间隔时长
+        self.tab_name = f'proxy:{self.scheme}:items'
+        self.lst_name = f'proxy:{self.scheme}:pk_0'
+        self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
+        self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
 
     def request_proxy(self):
-        logger.info(f"[{self.label}]请求vps服务")
         proxy_lst = []
         for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
             try:
                 response = requests.get(url, timeout=10)
+            except requests.RequestException as e:
+                logger.error(f'vps服务|请求失败|{url}|原因:{e.args}')
+            else:
                 for item in response.json():
                     ports = list(filter(lambda p: p in self.ports, item['ports']))
                     if not ports:
                         continue
 
                     ip = decrypt(item['ip'])
-                    port = int(ports[random.randint(0, len(ports) - 1)])
-                    start_time = tools.now_ts()
-                    end_time = item['lifetime']
-                    if end_time - start_time > 0:
+                    for port in ports:
+                        args = (self.scheme, ip, int(port))
                         proxy = {
                             'proxies': {
-                                'http': '{}://{}:{}'.format(self.scheme, ip, port),
-                                'https': '{}://{}:{}'.format(self.scheme, ip, port)
+                                'http': '{}://{}:{}'.format(*args),
+                                'https': '{}://{}:{}'.format(*args)
                             },
-                            'fingerprint': self.fingerprint(ip=ip, port=port),
-                            'start_time': start_time,
-                            'end_time': end_time,
-                            'last_time': 0,
-                            'usage': 0,
+                            'expire': item['lifetime'],
                             'pk': idx + 1
                         }
                         proxy_lst.append(proxy)
-            except Exception as e:
-                logger.error(f'[{self.label}]vps服务访问异常[{url}],原因:{e.args}')
 
         return proxy_lst
 
-    def manage_proxy(self, proxy_lst: list, workers=1):
-        self.remove_failure_proxy(proxy_lst)
-        with ThreadPoolExecutor(max_workers=workers) as Executor:
-            fs = [Executor.submit(self.check, proxy) for proxy in proxy_lst]
-            for f in as_completed(fs):
-                proxy, is_ok = f.result()
-                if is_ok:
-                    self.add_proxy(proxy)
-                else:
-                    self.remove_proxy(proxy)
+    def add_proxy(self, proxy):
+        """
+        添加代理IP到Redis
+        """
+        host = get_host(proxy['proxies'], self.scheme)
+        if not self.redis_db.hexists(self.tab_name, host):
+            self.redis_db.rpush(self.lst_name, host)
+            self.redis_db.hset(self.tab_name, host, tools.json_dumps(proxy))
+
+            pk = proxy['pk']
+            if pk == 1:
+                self.redis_db.rpush(self.lst_name_1, host)
+            elif pk == 2:
+                self.redis_db.rpush(self.lst_name_2, host)
+
+            logger.debug(f"添加代理|{host}")
+
+    def del_proxy(self, proxy):
+        host = get_host(proxy['proxies'], self.scheme)
+
+        n1 = 0
+        pk = proxy['pk']
+        if pk == 1:
+            n1 = self.redis_db.lrem(self.lst_name_1, 0, host)
+        elif pk == 2:
+            n1 = self.redis_db.lrem(self.lst_name_2, 0, host)
+
+        n0 = self.redis_db.lrem(self.lst_name, 0, host)  # 移除所有匹配的元素
+        self.redis_db.hdel(self.tab_name, host)
+        logger.debug(f"移除代理|{host}|{n1 + n0}")
+
+    def update_proxy(self, proxy, success):
+        """
+        更新代理IP的信息
+        """
+        if not success:
+            self.del_proxy(proxy)
+        else:
+            self.add_proxy(proxy)
 
-    def run(self):
-        logger.info(f'[{self.label}]开始生产代理Ip')
+    def validate_proxy(self, proxy):
+        # url = 'https://myip.ipip.net'
+        url = 'https://www.baidu.com/'
+        proxies = proxy['proxies']
+        host = str(proxies['http']).replace(self.scheme + '://', '', 1)
+        logger.debug(f"代理检查|{host}")
+        try:
+            request_params = {
+                "headers": {"User-Agent": DEFAULT_UA},
+                "proxies": proxies,
+                "timeout": 5
+            }
+            r = requests.get(url, **request_params)
+            return proxy, r.status_code == requests.codes.ok
+        except requests.RequestException:
+            return proxy, False
+
+    def manage_proxy(self, proxy_lst, workers):
+        """
+
+        :param list proxy_lst: 代理ip列表
+        :param int workers: 工作线程数
+        """
+        with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='validate') as executor:
+            fs = executor.map(self.validate_proxy, proxy_lst)
+            for args in fs:
+                self.update_proxy(*args)
+
+    def _create_pool(self):
+        logger.info('创建Ip池')
         while True:
             try:
                 proxy_lst = self.request_proxy()
                 if not proxy_lst:
-                    tools.delay(2)
+                    tools.delay(0.5)
                     continue
-                dynamic_workers = min((int(len(proxy_lst) / 2) or 1), 10)
-                self.manage_proxy(proxy_lst, workers=dynamic_workers)  # 线程池上限10
+
+                workers = min((int(len(proxy_lst) / 2) or 1), 4)  # 最大工作线程数
+                self.manage_proxy(proxy_lst, workers=workers)
                 tools.delay(self.load_interval)
             except Exception as e:
+                logger.error('Ip池创建失败')
                 logger.exception(e)
 
+    def _watch_pool(self):
+        logger.info('Ip池代理监控')
+        while True:
+            try:
+                proxy_dict = self.redis_db.hgetall(self.tab_name)
+                if not proxy_dict:
+                    tools.delay(0.5)
+                    continue
+
+                proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
+                invalid_proxy_lst = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst))
+                for proxy in invalid_proxy_lst:
+                    self.del_proxy(proxy)
+
+                tools.delay(2)
+            except Exception as e:
+                logger.exception(e)
+
+    def run(self):
+        _join_lst = []
+
+        _thread_lst = [
+            Thread(target=self._create_pool, daemon=True, name='CreatePool'),
+            Thread(target=self._watch_pool, daemon=True, name='WatchPool')
+        ]
+        for thread in _thread_lst:
+            thread.start()
+            _join_lst.append(thread)
+
+        for thread in _join_lst:
+            thread.join()
+
 
 class ProxyPoolClient(BaseProxyPool):
 
-    def __init__(self, name: str, redis_label: str, scheme: str):
+    def __init__(self, service_name, redis_label, scheme):
         """
-        调用代理池
+
+        @param str service_name: 服务名称
+        @param str redis_label: 服务名称
+        @param str scheme: 协议名称
         """
-        super(ProxyPoolClient, self).__init__(name, redis_label, scheme)
-        current_process = multiprocessing.current_process()
-        sub_name = f'{tools.get_localhost_ip()}:{current_process.pid}'
-        self.lock_label = f'{redis_label}:{sub_name}'
-
-    def proxy_total(self):
-        return len(self.get_redis_name_lst())
-
-    def get_all_proxy(self, pk=None):
-        proxy_lst = deque([])
-        for redis_name in self.get_redis_name_lst():
-            proxy = self.get_proxy(redis_name)
-            if isinstance(proxy, dict):
-                proxy_lst.append(proxy)
-
-        if len(proxy_lst) > 0:
-            if pk is not None:
-                '''先按照代理类型(pk)过滤,再按照使用次数进行升序排列'''
-                special = deque(filter(lambda x: x['pk'] == int(pk), proxy_lst))
-                proxy_lst = deque(sorted(special, key=itemgetter('usage')))
-            else:
-                '''按照使用次数进行升序排列(左小右大)'''
-                proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage')))
+        super().__init__(service_name, redis_label, scheme)
+        self.tab_name = f'proxy:{self.scheme}:items'
+        self.lst_name = f'proxy:{self.scheme}:pk_0'
+        self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
+        self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
+
+    def _get_proxy(self, src, dst, timeout=3.0):
+        return self.redis_db.brpoplpush(src, dst, timeout=timeout)
+
+    def get_proxy(self):
+        """
+        从Redis中获取一个代理IP
+        """
+        return self._get_proxy(self.lst_name, self.lst_name, timeout=0.5)
+
+    def get_pk1_proxy(self):
+        return self._get_proxy(self.lst_name_1, self.lst_name_1, timeout=0.5)
+
+    def get_pk2_proxy(self):
+        return self._get_proxy(self.lst_name_2, self.lst_name_2, timeout=0.5)
+
+    def proxies(self, pk=0):
+        if pk == 1:
+            host = self.get_pk1_proxy()
+        elif pk == 2:
+            host = self.get_pk2_proxy()
+        else:
+            host = self.get_proxy()
 
+        if not host:
+            return
+
+        args = (self.scheme, host)
+        r = {'http': '{}://{}'.format(*args), 'https': '{}://{}'.format(*args)}
+        return r
+
+    def _get_all_proxy(self, pk=None):
+        results = []
+        proxy_dict = self.redis_db.hgetall(self.tab_name)
+        if not proxy_dict:
+            return results
+
+        proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
+        if pk is not None:
+            proxy_lst = list(filter(lambda p: p['pk'] == pk, proxy_lst))
         return proxy_lst
 
     def get_proxy_pool(self, **kwargs):
-        proxy_lst = []
-        for proxy in self.get_all_proxy(**kwargs):
-            last_time = proxy['last_time']
-            end_time = proxy['end_time']
-            expire = end_time - tools.now_ts()
-            proxy_lst.append({
+        results = []
+        proxy_lst = self._get_all_proxy(**kwargs)
+        for proxy in proxy_lst:
+            life_time = proxy['expire']
+            expire = life_time - tools.now_ts()
+            results.append({
                 'proxies': proxy['proxies'],
-                'start_time': tools.ts2dt(proxy['start_time']),
-                'end_time': tools.ts2dt(end_time),
-                'last_time': tools.ts2dt(last_time) if last_time != 0 else '',
+                'expire_time': tools.ts2dt(life_time),
                 'expire': expire,
-                'usage': proxy['usage'],
                 'pk': proxy.get('pk', 1),
             })
+
         # 展示时按照过期时间从大到小排列
-        return list(sorted(proxy_lst, key=lambda x: x['expire'], reverse=True))
+        return list(sorted(results, key=lambda x: x['expire'], reverse=True))
 
     def get_all_proxy_ip(self, protocol, **kwargs):
         return [
-            proxy['proxies']['http'].replace(f'{protocol}://', '')
-            for proxy in self.get_all_proxy(**kwargs)
+            proxy['proxies']['http'].replace(f'{protocol}://', '', 1)
+            for proxy in self._get_all_proxy(**kwargs)
         ]
-
-    def proxies(self, **kwargs):
-        with OptimisticLock(self.redis_db, self.lock_label):
-            proxy = {}
-            if self.proxy_total() > 0:
-                proxy_lst = self.get_all_proxy(**kwargs)
-                proxy = proxy_lst.popleft()
-                name = self.get_redis_name(proxy)
-                mapping = {
-                    'usage': proxy['usage'] + 1,
-                    'last_time': tools.now_ts()
-                }
-                self.redis_db.hset(name, None, None, mapping)
-
-            return proxy.get('proxies')