Parcourir la source

代理池新增代理分类属性,便于不同采集场景灵活切换

dzr il y a 1 an
Parent
commit
0644c90957
6 fichiers modifiés avec 88 ajouts et 58 suppressions
  1. 15 10
      build_tools.py
  2. 1 1
      produce_task.py
  3. 65 44
      services/proxy.py
  4. 1 1
      setting.py
  5. 3 1
      yaml/dev.yaml
  6. 3 1
      yaml/test.yaml

+ 15 - 10
build_tools.py

@@ -96,28 +96,33 @@ def show_chrome_proxy_plugin_user():
 @auth.login_required
 def get_proxy(scheme):
     # logger.info(f'[访问ip]{request.remote_addr}, class:{scheduler_class_name}')
+    pk = request.args.get('pk')  # 代理类型:1=部署lua下载器节点类 2=python专用代理
+
     result = {}
+    proxies = None
     try:
-        proxies = None
         if scheme == 'http':
-            proxies = httpProxyPool.proxies()
+            proxies = httpProxyPool.proxies(pk=pk)
         elif scheme == 'socks5':
-            proxies = socks5ProxyPool.proxies()
+            proxies = socks5ProxyPool.proxies(pk=pk)
         else:
             abort(404)
+
         logger.info(f'[调用{scheme}代理]{proxies}')
         if proxies is not None:
             result.update(proxies)
-    except KeyError:
+    except (KeyError, IndexError):
         pass
+
     return jsonify(data=result)
 
 
 @app.route('/crawl/proxy/query', methods=['GET'])
 @auth.login_required
 def show_proxy():
-    socks_pool = socks5ProxyPool.get_proxy_pool()
-    http_pool = httpProxyPool.get_proxy_pool()
+    pk = request.args.get('pk')
+    socks_pool = socks5ProxyPool.get_proxy_pool(pk=pk)
+    http_pool = httpProxyPool.get_proxy_pool(pk=pk)
     pool = [*socks_pool, *http_pool]
     return jsonify(data=pool)
 
@@ -125,10 +130,10 @@ def show_proxy():
 @app.route('/crawl/proxy/getips', methods=['GET'])
 @auth.login_required
 def show_proxy_ips():
-    socks_ips = socks5ProxyPool.get_all_proxy_ip('socks5')
-    http_ips = httpProxyPool.get_all_proxy_ip('http')
-    ip_dict = {'socks': socks_ips, 'http': http_ips}
-    return jsonify(data=ip_dict)
+    pk = request.args.get('pk')
+    socks_ips = socks5ProxyPool.get_all_proxy_ip('socks5', pk=pk)
+    http_ips = httpProxyPool.get_all_proxy_ip('http', pk=pk)
+    return jsonify(data={'socks': socks_ips, 'http': http_ips})
 
 
 @app.route('/upload/data/<scheduler_class_name>/<table>', methods=['POST'])

+ 1 - 1
produce_task.py

@@ -19,7 +19,7 @@ def create_server():
     logger.info("开启服务")
     services = [
         # NMPAServer('国家药品监督管理局', 'py_theme', 'nmpa_c', 100),
-        CompetitiveProductServer('元博网', 'ybw', 'py_spider', 'ybw_list', 1000),
+        # CompetitiveProductServer('元博网', 'ybw', 'py_spider', 'ybw_list', 1000),
         CompetitiveProductServer('中国招标与采购网', 'zbytb', 'py_spider', 'zbytb_list', 1000),
         # SiteMonitorServer('网站监控', 'monitor', 'py_spider', 'site_monitor'),
         ProxyPoolServer('代理池', 'proxy', 'socks5'),

+ 65 - 44
services/proxy.py

@@ -99,6 +99,7 @@ class BaseProxyPool(BaseServer):
             'end_time': int(items['end_time']),
             'last_time': int(items['last_time']),
             'usage': int(items['usage']),
+            'pk': int(items.get('pk', 1))
         }
         return proxy
 
@@ -146,12 +147,18 @@ class BaseProxyPool(BaseServer):
 
 class ProxyPoolServer(BaseProxyPool, threading.Thread):
 
-    def __init__(self, name, redis_label, scheme: str):
+    def __init__(self, name, redis_label, scheme):
+        """
+        代理池生产管理
+
+        @param str name: 服务名称
+        @param str redis_label: redis 标识前缀
+        @param str scheme: 协议类型
+        """
         threading.Thread.__init__(self)
         super(ProxyPoolServer, self).__init__(name, redis_label, scheme)
-
         self.label = f'{self.proxy_name}_{self.getName()}'
-        self.ports = ['8862', '8863'] if self.scheme == "http" else ['8860', '8861']
+        self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
         self.load_interval = 60  # 轮询访问vps代理服务的时间间隔
 
     def remove_failure_proxy(self, proxy_lst):
@@ -166,34 +173,35 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
     def request_proxy(self):
         logger.info(f"[{self.label}]请求vps服务")
         proxy_lst = []
-        try:
-            url = settings.jy_proxy['socks5']['url']
-            response = requests.get(url, timeout=10)
-            for item in response.json():
-                ports = list(filter(lambda p: p in self.ports, item['ports']))
-                if not ports:
-                    continue
+        for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
+            try:
+                response = requests.get(url, timeout=10)
+                for item in response.json():
+                    ports = list(filter(lambda p: p in self.ports, item['ports']))
+                    if not ports:
+                        continue
+
+                    ip = decrypt(item['ip'])
+                    port = int(ports[random.randint(0, len(ports) - 1)])
+                    start_time = tools.now_ts()
+                    end_time = item['lifetime']
+                    if end_time - start_time > 0:
+                        proxy = {
+                            'proxies': {
+                                'http': '{}://{}:{}'.format(self.scheme, ip, port),
+                                'https': '{}://{}:{}'.format(self.scheme, ip, port)
+                            },
+                            'fingerprint': self.fingerprint(ip=ip, port=port),
+                            'start_time': start_time,
+                            'end_time': end_time,
+                            'last_time': 0,
+                            'usage': 0,
+                            'pk': idx + 1
+                        }
+                        proxy_lst.append(proxy)
+            except Exception as e:
+                logger.error(f'[{self.label}]vps服务访问异常[{url}],原因:{e.args}')
 
-                ip = decrypt(item['ip'])
-                port = int(ports[random.randint(0, len(ports) - 1)])
-                start_time = tools.now_ts()
-                end_time = item['lifetime']
-                if end_time - start_time > 0:
-                    proxy = {
-                        'proxies': {
-                            'http': '{}://{}:{}'.format(self.scheme, ip, port),
-                            'https': '{}://{}:{}'.format(self.scheme, ip, port)
-                        },
-                        'fingerprint': self.fingerprint(ip=ip, port=port),
-                        'start_time': start_time,
-                        'end_time': end_time,
-                        'last_time': 0,
-                        'usage': 0,
-                    }
-                    proxy_lst.append(proxy)
-
-        except Exception as e:
-            logger.error(f"[{self.label}]vps服务访问异常,原因:{e.args}")
         return proxy_lst
 
     def manage_proxy(self, proxy_lst: list, workers=1):
@@ -225,16 +233,19 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
 class ProxyPoolClient(BaseProxyPool):
 
     def __init__(self, name: str, redis_label: str, scheme: str):
+        """
+        调用代理池
+        """
         super(ProxyPoolClient, self).__init__(name, redis_label, scheme)
         current_process = multiprocessing.current_process()
-        sub_label = f'{tools.get_localhost_ip()}:{current_process.pid}'
-        self.lock_label = f'{redis_label}:{sub_label}'
+        sub_name = f'{tools.get_localhost_ip()}:{current_process.pid}'
+        self.lock_label = f'{redis_label}:{sub_name}'
 
     @property
     def proxy_total(self):
         return len(self.get_redis_name_lst())
 
-    def get_all_proxy(self):
+    def get_all_proxy(self, pk=None):
         proxy_lst = deque([])
         for redis_name in self.get_redis_name_lst():
             proxy = self.get_proxy(redis_name)
@@ -242,40 +253,50 @@ class ProxyPoolClient(BaseProxyPool):
                 proxy_lst.append(proxy)
 
         if len(proxy_lst) > 0:
-            '''按照使用次数大小从低到高(左小右大)排序'''
-            proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage')))
+            if pk is not None:
+                '''先按照代理类型(pk)过滤,再按照使用次数进行升序排列'''
+                special = deque(filter(lambda x: x['pk'] == int(pk), proxy_lst))
+                proxy_lst = deque(sorted(special, key=itemgetter('usage')))
+            else:
+                '''按照使用次数进行升序排列(左小右大)'''
+                proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage')))
 
         return proxy_lst
 
-    def get_proxy_pool(self):
-        _pool_proxy = []
-        for proxy in self.get_all_proxy():
+    def get_proxy_pool(self, **kwargs):
+        proxy_lst = []
+        for proxy in self.get_all_proxy(**kwargs):
             last_time = proxy['last_time']
             end_time = proxy['end_time']
             expire = end_time - tools.now_ts()
-            _pool_proxy.append({
+            proxy_lst.append({
                 'proxies': proxy['proxies'],
                 'start_time': tools.ts2dt(proxy['start_time']),
                 'end_time': tools.ts2dt(end_time),
                 'last_time': tools.ts2dt(last_time) if last_time != 0 else '',
                 'expire': expire,
                 'usage': proxy['usage'],
+                'pk': proxy.get('pk', 1),
             })
         # 展示时按照过期时间从大到小排列
-        return list(sorted(_pool_proxy, key=lambda x: x['expire'], reverse=True))
+        return list(sorted(proxy_lst, key=lambda x: x['expire'], reverse=True))
 
-    def get_all_proxy_ip(self, protocol):
-        return [proxy['proxies']['http'].replace(f'{protocol}://', '') for proxy in self.get_all_proxy()]
+    def get_all_proxy_ip(self, protocol, **kwargs):
+        return [
+            proxy['proxies']['http'].replace(f'{protocol}://', '')
+            for proxy in self.get_all_proxy(**kwargs)
+        ]
 
-    def proxies(self):
+    def proxies(self, **kwargs):
         lock = acquire_lock_with_timeout(self.redis_db, self.lock_label)
         if lock:
             proxy = {}
             if self.proxy_total > 0:
-                proxy_lst = self.get_all_proxy()
+                proxy_lst = self.get_all_proxy(**kwargs)
                 proxy = proxy_lst.popleft()
                 name = self.get_redis_name(proxy)
                 self.redis_db.hset(name, 'usage', proxy['usage'] + 1)
                 self.redis_db.hset(name, 'last_time', tools.now_ts())
+
             release_lock(self.redis_db, self.lock_label, lock)
             return proxy.get('proxies')

+ 1 - 1
setting.py

@@ -23,7 +23,7 @@ else:
     ENV = 'test.yaml'
 
 _base_path = Path(__file__).parent
-_yaml_conf = (_base_path / 'yml' / ENV).resolve()
+_yaml_conf = (_base_path / 'yaml' / ENV).resolve()
 
 with open(_yaml_conf, encoding="utf-8") as f:
     _conf = yaml.safe_load(f)

+ 3 - 1
yml/dev.yaml → yaml/dev.yaml

@@ -1,6 +1,8 @@
 proxy:
   socks5:
-    url: http://proxy.spdata.jianyu360.com/proxy/getallip
+    url:
+      - http://proxy.spdata.jianyu360.com/proxy/getallip
+      - http://proxy2.spdata.jianyu360.com/proxy/getallip
     decrypt: ABNOPqrceQRSTklmUDEFGXYZabnopfghHVWdijstuvwCIJKLMxyz0123456789+/
     base_url: http://cc.spdata.jianyu360.com
 

+ 3 - 1
yml/test.yaml → yaml/test.yaml

@@ -1,6 +1,8 @@
 proxy:
   socks5:
-    url: http://proxy.spdata.jianyu360.com/proxy/getallip
+    url:
+      - http://proxy.spdata.jianyu360.com/proxy/getallip
+      - http://proxy2.spdata.jianyu360.com/proxy/getallip
     decrypt: ABNOPqrceQRSTklmUDEFGXYZabnopfghHVWdijstuvwCIJKLMxyz0123456789+/
     base_url: http://127.0.0.1:1405