소스 검색

oss正文附件上传方式迁移

dzr 1 개월 전
부모
커밋
797e35aff7
4개의 변경된 파일162개의 추가작업 그리고 75개의 파일을 삭제
  1. 3 11
      zbytb/config/conf.yaml
  2. 2 3
      zbytb/config/load.py
  3. 0 23
      zbytb/utils/aliyun.py
  4. 157 38
      zbytb/utils/attachment.py

+ 3 - 11
zbytb/config/conf.yaml

@@ -5,16 +5,6 @@ mongo:
 #  host: 127.0.0.1
 #  port: !!int 27017
 
-
-# 阿里oss
-ali_oss:
-  key_id: LTAI4G5x9aoZx8dDamQ7vfZi
-  key_secret: Bk98FsbPYXcJe72n1bG3Ssf73acuNh
-#  endpoint: oss-cn-beijing.aliyuncs.com    # 公网使用
-  endpoint: oss-cn-beijing-internal.aliyuncs.com    # 内网使用
-  bucket_name: jy-datafile
-
-
 es:
   host: 172.17.4.184
 #  host: 127.0.0.1
@@ -23,10 +13,12 @@ es:
   port: !!int 19905
   db: biddingall # es库别名
 
-
 # 代理
 proxy:
   socks5:
     url: http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch
     auth:
       Authorization: Basic amlhbnl1MDAxOjEyM3F3ZSFB
+
+oss:
+  address: http://172.17.162.27:18011

+ 2 - 3
zbytb/config/load.py

@@ -4,7 +4,7 @@ import yaml
 
 __all__ = [
     'mongo_conf',
-    'oss_conf',
+    'oss_address',
     'jy_proxy',
     'es_conf',
     'headers',
@@ -19,10 +19,9 @@ _node_modules = (_base_path.parent / 'node_modules').resolve()
 with open(_yaml_conf, encoding="utf-8") as f:
     conf = yaml.safe_load(f)
     mongo_conf = conf['mongo']
-    oss_conf: dict = conf['ali_oss']
     es_conf: dict = conf['es']
     jy_proxy: dict = conf['proxy']
-
+    oss_address = conf['oss']["address"]
 
 with open(_yaml_constants, encoding="utf-8") as fp:
     constants = yaml.safe_load(fp)

+ 0 - 23
zbytb/utils/aliyun.py

@@ -1,23 +0,0 @@
-import oss2
-
-from config.load import oss_conf
-
-
-class AliYunService:
-
-    def __init__(self):
-        self.__acc_key_id = oss_conf['key_id']
-        self.__acc_key_secret = oss_conf['key_secret']
-        self.__endpoint = oss_conf['endpoint']
-        self.__bucket_name = oss_conf['bucket_name']
-
-    def _push_oss_from_local(self, key, filename):
-        """
-        上传一个本地文件到OSS的普通文件
-
-        :param str key: 上传到OSS的文件名
-        :param str filename: 本地文件名,需要有可读权限
-        """
-        auth = oss2.Auth(self.__acc_key_id, self.__acc_key_secret)
-        bucket = oss2.Bucket(auth, self.__endpoint, self.__bucket_name)
-        bucket.put_object_from_file(key, filename)

+ 157 - 38
zbytb/utils/attachment.py

@@ -1,23 +1,116 @@
 import io
+import os
 import traceback
 import uuid
+from io import BytesIO
 
+import requests
 import urllib3
 
-from config.load import headers
-from utils.aliyun import AliYunService
-from utils.clean_file import *
-from utils.execptions import AttachmentError
+from config.load import headers, oss_address
+from utils.clean_file import (
+    clean_file_name,
+    judge_file_url,
+    modify_file_url_list,
+    remove,
+    limit_file_size
+)
+from utils.clean_file import sha1, getsize
+from utils.log import logger
 from utils.socks5 import Proxy
 
 urllib3.disable_warnings()
 
 
-class AttachmentDownloader(AliYunService):
+class OssClient(object):
+    def __init__(self, domain: str):
+        # 初始化函数,用于创建类的实例
+        self.domain = domain
+
+    def upload(self, args: dict, retries=3, err_show=False) -> dict:
+        reply = {"error_code": -1}
+
+        raise_err = None
+        for _ in range(retries):
+            try:
+                files = {
+                    'file': (args['object_name'], BytesIO(args['stream'])),
+                }
+                data = {
+                    'bucket_id': args['bucket_id'],
+                    'object_name': args['object_name'],
+                    'gzip': args.get('gzip', False),
+                }
+
+                response = requests.post(
+                    f"{self.domain}/ossservice/upload",
+                    files=files,
+                    data=data,
+                    timeout=300,
+                )
+                if response.status_code == 200:
+                    reply.update(response.json())
+                else:
+                    reply['error_msg'] = f"HTTP error: {response.status_code}"
+
+                raise_err = None
+                break
+            except Exception as e:
+                reply['error_msg'] = str(e)
+                raise_err = e
+
+        if err_show and raise_err is not None:
+            raise raise_err
+
+        return reply
+
+    def download(self, args: dict):
+        reply = {}
+        try:
+            data = {
+                "bucket_id": args["bucket_id"],
+                "object_name": args["object_name"]
+            }
+
+            url = f"{self.domain}/ossservice/download"
+            response = requests.post(url, data=data, timeout=300)
+            response.raise_for_status()
+
+            reply["error_code"] = 0
+            reply["error_msg"] = "下载成功"
+            reply["data"] = response.content
+
+        except Exception as e:
+            reply["error_code"] = -1
+            reply["error_msg"] = str(e)
+
+        return reply
+
+    def delete(self, args: dict):
+        reply = {}
+        try:
+            data = {
+                "bucket_id": args["bucket_id"],
+                "object_name": args["object_name"]
+            }
+
+            url = f"{self.domain}/ossservice/delete"
+            response = requests.post(url, data=data, timeout=10)
+            response.raise_for_status()
+
+            reply = response.json()
+            reply["error_code"] = 0
+        except Exception as e:
+            reply["error_code"] = -1
+            reply["error_msg"] = str(e)
+        return reply
+
+
+class AttachmentDownloader:
 
     def __init__(self):
-        super(AttachmentDownloader, self).__init__()
         self.dir_name = 'file'
+        self._oss = OssClient(domain=oss_address)
 
     def _create_file(self, filename, filetype):
         os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
@@ -52,34 +145,55 @@ class AttachmentDownloader(AliYunService):
         return self._calc_size(_kb)
 
     def stream_download(self, file_path, response):
+        max_size = 1024 * 1024  # 1M
         stream = io.BytesIO()
-        if 'content-length' in response.headers:
-            content_size = int(response.headers['content-length'])  # 内容体总大小
+        h = {k.lower(): v for k, v in response.headers.items()}
+        content_length = h.get('content-length')
+        if content_length is not None:
+            content_size = int(content_length)
+            content_length = float(content_size) / max_size  # 内容体总大小,单位:M
+            if content_length > 50:
+                # 丢弃超过50Mb内容长度的文件
+                return stream.getvalue()
         else:
-            content_size = len(response.content)
-        size = self._calc_size(float(content_size) / 1024)
-        logger.info(f'>>> 下载附件:{file_path} - 大小:{size}')
+            content_size = None
 
         chunk_size = 1024  # 单次请求数据块最大值
         data_chunk_size = 0  # 下载数据块
         with open(file_path, 'wb') as f:
             for data in response.iter_content(chunk_size=chunk_size):
-                stream.write(data)
+                size = stream.write(data)
                 f.write(data)
-                data_chunk_size += len(data)
-                percent = (data_chunk_size / content_size) * 100
-                print("\r文件下载进度:%d%%(%d/%d) - %s" % (
-                    percent, data_chunk_size, content_size, file_path), end=" ")
+                data_chunk_size += size
+                if data_chunk_size / max_size > 50:
+                    stream.truncate(0)  # 清空流
+                    stream.seek(0)  # 将位置指针移回流的开始处
+                    logger.warning("超过50M的附件|丢弃下载")
+                    break
+
+                # 如果content_size未设置,进行估算
+                if content_size is None:
+                    if size == chunk_size:
+                        # 接收到完整块,假设总大小是当前的2倍
+                        content_size = data_chunk_size * 2
+                    else:
+                        # 接收到最后一个块(通常小于chunk_size)
+                        content_size = data_chunk_size
+
+                # 计算并显示进度
+                percent = min(100, int((data_chunk_size / content_size) * 100)) if content_size else 0
+                print("\r文件下载进度:%d%%(%d/%d) - %s" % (percent, data_chunk_size, content_size, file_path), end=" ")
+
         return stream.getvalue()
 
     def _fetch_file(
-            self,
-            method: str,
-            url: str,
-            file: str,
-            enable_proxy: bool,
-            allow_show_exception: bool,
-            **kwargs
+        self,
+        method: str,
+        url: str,
+        file: str,
+        enable_proxy: bool,
+        allow_show_exception: bool,
+        **kwargs
     ):
         request_params = {}
         request_params.setdefault('headers', kwargs.get('headers') or headers)
@@ -92,10 +206,8 @@ class AttachmentDownloader(AliYunService):
         while retries < 3:
             try:
                 with requests.request(method, url, **request_params) as req:
-                    if req.status_code == 200:
-                        return self.stream_download(file, req)
-                    else:
-                        retries += 1
+                    req.raise_for_status()
+                    return self.stream_download(file, req)
             except requests.RequestException:
                 if allow_show_exception:
                     traceback.print_exc()
@@ -106,16 +218,14 @@ class AttachmentDownloader(AliYunService):
         return b''
 
     def download(
-            self,
-            file_name: str,
-            file_type: str,
-            download_url: str,
-            enable_proxy=False,
-            allow_request_exception=False,
-            **kwargs
+        self,
+        file_name: str,
+        file_type: str,
+        download_url: str,
+        enable_proxy=False,
+        allow_request_exception=False,
+        **kwargs
     ):
-        if not file_name or not file_type or not download_url:
-            raise AttachmentError
         request_method = kwargs.pop('method', 'get')
         file_type = file_type.strip()
         file_name = clean_file_name(file_name, file_type)
@@ -145,12 +255,21 @@ class AttachmentDownloader(AliYunService):
                 result.setdefault('ftype', file_type)
                 result.setdefault('size', self._file_size(local_tmp_file))
                 result.setdefault('url', 'oss')
-                super()._push_oss_from_local(key, local_tmp_file)
+
+                args = {
+                    "bucket_id": "file",
+                    "object_name": key,
+                    "gzip": False,
+                    "stream": file_stream
+                }
+                self._oss.upload(args, err_show=True)
             except Exception as e:
                 logger.warning(
-                    "[{}]下载异常,原因:{}".format(file_name, e.__class__.__name__)
+                    "[{}]下载异常,原因:{}".format(file_name, type(e).__name__)
                 )
+
         remove(local_tmp_file)
+
         '''上传/下载,无论失败/成功必须返回附件信息'''
         if "size" not in result:
             return result