Browse Source

修复若干问题

dongzhaorui 3 weeks ago
parent
commit
dc84dd82d0

+ 15 - 8
FworkSpider/feapder/network/proxy_pool/proxy_pool.py

@@ -26,19 +26,23 @@ class ProxyPool(BaseProxyPool):
     def __init__(self, proxy_api=None, **kwargs):
         self.proxy_api = proxy_api or setting.PROXY_EXTRACT_API
         self.proxy_queue = Queue()
+        self.tunnel = setting.PROXY_TUNNEL or False
 
     def format_proxy(self, proxy):
         return {"http": "http://" + proxy, "https": "http://" + proxy}
 
     @tools.retry(3, interval=5)
     def pull_proxies(self):
-        resp = requests.get(self.proxy_api, timeout=10)
-        proxies = resp.text.strip()
-        resp.close()
-        if "{" in proxies or not proxies:
-            raise Exception("获取代理失败", proxies)
-        # 使用 /r/n 分隔
-        return proxies.split("\r\n")
+        if not self.tunnel:
+            resp = requests.get(self.proxy_api, timeout=10)
+            proxies = resp.text.strip()
+            resp.close()
+            if "{" in proxies or not proxies:
+                raise Exception("获取代理失败", proxies)
+            # 使用 /r/n 分隔
+            return proxies.split("\r\n")
+        else:
+            return [{"http": self.proxy_api, "https": self.proxy_api}]
 
     def get_proxy(self):
         try:
@@ -53,7 +57,10 @@ class ProxyPool(BaseProxyPool):
 
             metrics.emit_counter("used_times", 1, classify="proxy")
 
-            return self.format_proxy(proxy)
+            if not self.tunnel:
+                return self.format_proxy(proxy)
+            else:
+                return proxy
         except Exception as e:
             tools.send_msg("获取代理失败", level="error")
             raise Exception("获取代理失败", e)

+ 5 - 0
FworkSpider/feapder/setting.py

@@ -166,6 +166,7 @@ DELETE_KEYS = []
 # 设置代理
 PROXY_EXTRACT_API = None  # 代理提取API ,返回的代理分割符为\r\n 或者 json数据{"http":"http://xxx", "https":"https://xxx"}
 PROXY_ENABLE = True
+PROXY_TUNNEL = False  # 代理是否为隧道代理
 PROXY_MAX_FAILED_TIMES = 5  # 代理最大失败次数,超过则不使用,自动删除
 PROXY_AUTH = os.getenv("PROXY_AUTH")
 PROXY_POOL = "feapder.network.proxy_pool.ProxyPool"  # 代理池
@@ -263,6 +264,10 @@ CAPTCHA_URL = os.getenv("CAPTCHA_URL", "http://pycaptcha.spdata.jianyu360.com")
 # 剑鱼任务中心地址
 JY_TASK_URL = os.getenv("JY_TASK_URL", "http://pytask.spdata.jianyu360.com")
 
+# 剑鱼附件管理服务地址
+JY_OSS_URL = os.getenv("JY_OSS_URL", "http://172.17.162.27:18011")
+JY_OSS_URL_TEST = os.getenv("JY_OSS_URL_TEST", "http://172.31.31.203:1111")
+
 ############# 导入用户自定义的setting #############
 try:
     from setting import *

+ 0 - 0
FworkSpider/untils/clean_html.py → FworkSpider/feapder/utils/clean_html.py


+ 231 - 0
FworkSpider/feapder/utils/oss.py

@@ -0,0 +1,231 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-26
+---------
+@summary: oss附件服务
+---------
+"""
+from io import BytesIO
+
+import oss2
+import requests
+
+import feapder.setting as setting
+
+
+class AttachmentError(Exception):
+
+    def __init__(self, *args, **kwargs):
+        if 'code' not in kwargs and 'reason' not in kwargs:
+            kwargs['code'] = 0
+            kwargs['reason'] = '附件错误'
+
+        if 'reason' in kwargs and kwargs['reason'] is None:
+            kwargs['reason'] = '附件错误'
+
+        for key, val in kwargs.items():
+            setattr(self, key, val)
+
+        super(AttachmentError, self).__init__(*args, kwargs)
+
+
+class OssClient(object):
+    def __init__(self, domain: str):
+        # 初始化函数,用于创建类的实例
+        self.domain = domain
+
+    def upload(self, args: dict) -> dict:
+        reply = {"error_code": -1}
+        try:
+            files = {
+                'file': (args['object_name'], BytesIO(args['stream'])),
+            }
+            data = {
+                'bucket_id': args['bucket_id'],
+                'object_name': args['object_name'],
+                'gzip': args.get('gzip', False),
+            }
+
+            response = requests.post(
+                f"{self.domain}/ossservice/upload",
+                files=files,
+                data=data,
+                timeout=300,
+            )
+            if response.status_code == 200:
+                reply.update(response.json())
+            else:
+                reply['error_msg'] = f"HTTP error: {response.status_code}"
+
+        except Exception as e:
+            reply['error_msg'] = str(e)
+
+        return reply
+
+    def download(self, args: dict):
+        reply = {}
+        try:
+            data = {
+                "bucket_id": args["bucket_id"],
+                "object_name": args["object_name"]
+            }
+
+            url = f"{self.domain}/ossservice/download"
+            response = requests.post(url, data=data, timeout=300)
+            response.raise_for_status()
+
+            reply["error_code"] = 0
+            reply["error_msg"] = "下载成功"
+            reply["data"] = response.content
+
+        except Exception as e:
+            reply["error_code"] = -1
+            reply["error_msg"] = str(e)
+
+        return reply
+
+    def delete(self, args: dict):
+        reply = {}
+        try:
+            data = {
+                "bucket_id": args["bucket_id"],
+                "object_name": args["object_name"]
+            }
+
+            url = f"{self.domain}/ossservice/delete"
+            response = requests.post(url, data=data, timeout=10)
+            response.raise_for_status()
+
+            reply = response.json()
+            reply["error_code"] = 0
+        except Exception as e:
+            reply["error_code"] = -1
+            reply["error_msg"] = str(e)
+        return reply
+
+
+class JyOssClient:
+
+    def __init__(self, domain=None, mode=None):
+        if domain is None:
+            domain = setting.JY_OSS_URL
+
+        if mode == "test":
+            domain = setting.JY_OSS_URL_TEST
+
+        self._oss_client = OssClient(domain=domain)
+
+    def upload(self, bucket_id, object_name, stream, gzip=False, retries=3, err_show=True):
+        """
+        上传附件
+
+        :param str bucket_id: 文件名
+        :param str object_name: 对象名称
+        :param bytes stream: 文件流
+        :param bool gzip: 是否压缩
+        :param int retries: 上传最大重试次数
+        :param bool err_show: 是否展示错误
+
+        """
+        args = {
+            "bucket_id": bucket_id,
+            "object_name": object_name,
+            "gzip": gzip,
+            "stream": stream
+        }
+
+        ret = {"error_msg": "附件上传错误", "error_code": -1}
+        for _ in range(retries):
+            ret = self._oss_client.upload(args)
+            if ret["error_code"] == 0:
+                return ret
+
+        if err_show:
+            raise AttachmentError(reason=ret.get("error_msg") or "附件上传错误")
+
+        return ret
+
+    def download(self, bucket_id, object_name, retries=3, err_show=False):
+        """
+        下载附件
+
+        :param str bucket_id: 文件名
+        :param str object_name: 对象名称
+        :param int retries: 下载最大重试次数
+        :param bool err_show: 是否展示错误
+
+        """
+        args = {
+            "bucket_id": bucket_id,
+            "object_name": object_name,
+        }
+
+        ret = {"error_msg": "附件下载失败", "error_code": -1}
+        for _ in range(retries):
+            ret = self._oss_client.download(args)
+            if ret["error_code"] == 0 or ret["error_code"] == -1:
+                return ret
+
+        if err_show:
+            raise AttachmentError(reason=ret.get("error_msg") or "附件下载失败")
+
+        return ret
+
+    def delete(self, bucket_id, object_name, retries=3, err_show=False):
+        """
+        删除附件
+
+        :param str bucket_id: 文件名
+        :param str object_name: 对象名称
+        :param int retries: 删除最大重试次数
+        :param bool err_show: 是否展示错误
+
+        """
+        args = {
+            "bucket_id": bucket_id,
+            "object_name": object_name,
+        }
+
+        ret = {"error_msg": "附件删除失败", "error_code": -1}
+        for _ in range(retries):
+            ret = self._oss_client.delete(args)
+            if ret["error_code"] == 0:
+                return ret
+
+        if err_show:
+            raise AttachmentError(reason=ret.get("error_msg") or "附件删除失败")
+
+        return ret
+
+
+class OssBucketClient:
+
+    def __init__(self):
+        key_id = setting.ALI_BUCKET_CONFIG['key_id']
+        key_secret = setting.ALI_BUCKET_CONFIG['key_secret']
+        endpoint = setting.ALI_BUCKET_CONFIG['endpoint']
+        bucket_name = setting.ALI_BUCKET_CONFIG['bucket_name']
+        auth = oss2.Auth(key_id, key_secret)
+        self._bucket = oss2.Bucket(auth, endpoint, bucket_name)
+
+    def put_object_from_file(self, key, filename):
+        """
+        上传一个本地文件到OSS的普通文件
+
+        :param str key: 上传到OSS的文件名
+        :param str filename: 本地文件名,需要有可读权限
+        """
+        self._bucket.put_object_from_file(key, filename)
+
+    def put_object(self, key, data):
+        """
+        流式上传oss
+
+        :param str key: 上传到OSS的文件名
+        :param data: 待上传的内容。
+        :type data: bytes,str或file-like object
+        """
+        self._bucket.put_object(key, data)
+
+
+AliYunService = OssBucketClient

+ 4 - 1
FworkSpider/setting.py

@@ -129,9 +129,12 @@ LOG_MAX_BYTES = 10 * 1024 * 1024  # 每个日志文件的最大字节数
 LOG_BACKUP_COUNT = 20  # 日志文件保留数量
 LOG_ENCODING = "utf8"  # 日志文件编码
 
-# 详情采集任务领取接口
+# 剑鱼采集任务服务地址[py]
 JY_TASK_URL = "http://pytask.spdata.jianyu360.com"
 
+# 剑鱼附件管理服务地址
+JY_OSS_URL = "http://172.17.162.27:18011"
+
 # bucket配置
 ALI_BUCKET_CONFIG = {
     "key_id": "LTAI4G5x9aoZx8dDamQ7vfZi",