Browse Source

标讯数据oss上传功能修改;新增db判重

mxs 1 month ago
parent
commit
44dedfc769
4 changed files with 360 additions and 375 deletions
  1. 1 0
      src/config.json
  2. 83 0
      src/db.go
  3. 276 105
      src/main.go
  4. 0 270
      src/ossutil.go

+ 1 - 0
src/config.json

@@ -13,6 +13,7 @@
     "startid": "6163e98c1a75b8f446ade501",
 	"api": "http://172.17.145.179:19281/_send/_mail",
 	"to": "maxiaoshan@topnet.net.cn,zhangjinkun@topnet.net.cn",
+    "dbrepeat": "http://172.20.217.43:8291",
 	"osssite": {
 		"中国招标投标公共服务平台": 0.99
 	},

+ 83 - 0
src/db.go

@@ -0,0 +1,83 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	qu "qfw/util"
+)
+
+var DbRepeatAddr string
+
+// AddStr 新增
+func AddStr(bucket, key, val string) {
+	client := &http.Client{}
+	rq := map[string]interface{}{
+		"bucket": bucket,
+		"key":    key,
+		"val":    val,
+	}
+	rb, _ := json.Marshal(rq)
+	req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", DbRepeatAddr, "/addstr"), bytes.NewBuffer(rb))
+	if err != nil {
+		qu.Debug(err)
+	}
+	resp, err := client.Do(req)
+	if err != nil {
+		qu.Debug("sha db save err:", err, key)
+		return
+	}
+	defer resp.Body.Close()
+}
+
+// CheckStr 判断
+func CheckStr(bucket, key string) (repeat bool) {
+	client := &http.Client{}
+	rq := map[string]interface{}{
+		"bucket": bucket,
+		"key":    key,
+	}
+	rb, _ := json.Marshal(rq)
+	req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", DbRepeatAddr, "/checkstr"), bytes.NewBuffer(rb))
+	if err != nil {
+		return
+	}
+	req.Header.Set("Content-Type", "application/json")
+	resp, err := client.Do(req)
+	if err != nil {
+		return
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode == http.StatusOK {
+		// 读取响应
+		body, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return
+		}
+		json.Unmarshal(body, &repeat)
+	}
+	return
+}
+
+// RemoveStr 删除
+func RemoveStr(bucket, key, val string) {
+	client := &http.Client{}
+	rq := map[string]interface{}{
+		"bucket": bucket,
+		"key":    key,
+		"val":    val,
+	}
+	rb, _ := json.Marshal(rq)
+	req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", DbRepeatAddr, "/removestr"), bytes.NewBuffer(rb))
+	if err != nil {
+		qu.Debug(err)
+	}
+	resp, err := client.Do(req)
+	if err != nil {
+		qu.Debug("sha db delete err:", err, key)
+		return
+	}
+	defer resp.Body.Close()
+}

+ 276 - 105
src/main.go

@@ -15,17 +15,19 @@ import (
 )
 
 var (
-	Webport       string
-	Config        map[string]interface{}
-	Mgo           *mongodb.MongodbSim
-	Coll          string //ocr_flie_over
-	MoveCollFile  string //bidding_file
-	MoveCollNomal string //bidding_nomal
-	Api           string
-	To            string
-	TaskTime      int
-	SaveMgoCache  = make(chan map[string]interface{}, 1000) //
-	SP            = make(chan bool, 5)
+	Webport           string
+	Config            map[string]interface{}
+	Mgo               *mongodb.MongodbSim
+	Coll              string //ocr_flie_over
+	MoveCollFile      string //bidding_file
+	MoveCollNomal     string //bidding_nomal
+	Api               string
+	To                string
+	TaskTime          int
+	SaveMgoCache      = make(chan map[string]interface{}, 1000) //
+	SaveRepeatDbCache = make(chan map[string]interface{}, 1000)
+	SP                = make(chan bool, 5)
+	SP_Repeat         = make(chan bool, 5)
 	//StartId  string
 	Stop      bool
 	NewIdLock = &sync.Mutex{}
@@ -51,6 +53,8 @@ func init() {
 		Password:    qu.ObjToString(Config["password"]),
 	}
 	Mgo.InitPool()
+	//DbRepeatAddr
+	DbRepeatAddr = qu.ObjToString(Config["dbrepeat"])
 }
 func main() {
 	qu.Debug("start...")
@@ -59,12 +63,14 @@ func main() {
 	c.AddFunc("0 0 0 ? * *", DeleteData)
 	go monitor()
 	go SaveData()                //数据保存
+	go SaveRepeatData()          //重复数据保存
 	go FileDataMoveToBidding()   //附件数据迁移至bidding
 	go NormalDataMoveToBidding() //正常数据迁移至bidding
 	//xweb.Run(":" + Webport)
 	ch := make(chan bool, 1)
 	<-ch
 }
+
 func monitor() {
 	//最好是单实例调用
 	http.HandleFunc("/movebidding/stop", func(w http.ResponseWriter, r *http.Request) {
@@ -135,40 +141,68 @@ func NormalDataMove(query map[string]interface{}) {
 			_id := tmp["_id"]
 			update := []map[string]interface{}{}
 			update = append(update, map[string]interface{}{"_id": _id})
-			NewIdLock.Lock()
-			newId := primitive.NewObjectID()
-			NewIdLock.Unlock()
-			tmp["_id"] = newId
-			tmp["pre_id"] = mongodb.BsonIdToSId(_id)
-			strNewId := newId.Hex()
-			if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
-				tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strNewId) + `.html`
-			}
-			detail := qu.ObjToString(tmp["detail"])
-			//补充html标签,更新记录
-			result, ok, _ := repairHtml(detail)
-			//更新
-			set := map[string]interface{}{"moveok": true}
-			set["biddingid"] = strNewId
-			if ok && result != "" {
-				set["detail_update"] = result
-				tmp["detail"] = result
+			site := qu.ObjToString(tmp["site"])
+			s_sha := qu.ObjToString(tmp["s_sha"])
+			var repeat bool
+			if qu.IntAll(tmp["extracttype"]) == -1 { //站内判重
+				repeat = CheckStr("sha", site+s_sha)
 			}
-			//oss存储
-			objectName := fmt.Sprintf("%s%s", strNewId, ".txt")
-			detailStream := []byte(qu.ObjToString(tmp["detail"]))
-			rep := ossutil.UpLoadByRestful(Oss_Address, Detail_BucketId, objectName, detailStream, true)
-			if rep.Error_msg != UploadSuccess {
-				qu.Debug("正文上传失败:", _id, strNewId)
-			}
-			htmlStream := []byte(qu.ObjToString(tmp["contenthtml"]))
-			rep = ossutil.UpLoadByRestful(Oss_Address, Html_BucketId, objectName, htmlStream, true)
-			if rep.Error_msg != UploadSuccess {
-				qu.Debug("源码上传失败:", _id, strNewId)
+			if repeat {
+				update = append(update, map[string]interface{}{
+					"$set": map[string]interface{}{
+						"moveok": false,
+					},
+				})
+				SaveRepeatDbCache <- tmp
+			} else {
+				NewIdLock.Lock()
+				newId := primitive.NewObjectID()
+				NewIdLock.Unlock()
+				tmp["_id"] = newId
+				tmp["pre_id"] = mongodb.BsonIdToSId(_id)
+				strNewId := newId.Hex()
+				AddStr("sha", site+s_sha, strNewId)
+				//更新
+				set := map[string]interface{}{
+					"moveok":    true,
+					"biddingid": strNewId,
+				}
+				if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
+					tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strNewId) + `.html`
+				}
+				detail := qu.ObjToString(tmp["detail"])
+				//补充html标签,更新记录
+				result, ok, _ := repairHtml(detail)
+				if ok && result != "" {
+					set["detail_update"] = result
+					tmp["detail"] = result
+				}
+				//oss存储
+				objectName := fmt.Sprintf("%s%s", strNewId, ".txt")
+				detailStream := []byte(qu.ObjToString(tmp["detail"]))
+				detailArgs := &ossutil.UploadArgs{
+					BucketID:   Detail_BucketId,
+					ObjectName: objectName,
+					Stream:     detailStream,
+				}
+				rep := ossutil.UpLoadByRestful(Oss_Address, detailArgs)
+				if rep.Error_msg != ossutil.UploadSuccess {
+					qu.Debug("正文上传失败:", _id, strNewId)
+				}
+				htmlStream := []byte(qu.ObjToString(tmp["contenthtml"]))
+				contenthtmlArgs := &ossutil.UploadArgs{
+					BucketID:   Html_BucketId,
+					ObjectName: objectName,
+					Stream:     htmlStream,
+				}
+				rep = ossutil.UpLoadByRestful(Oss_Address, contenthtmlArgs)
+				if rep.Error_msg != ossutil.UploadSuccess {
+					qu.Debug("源码上传失败:", _id, strNewId)
+				}
+				update = append(update, map[string]interface{}{
+					"$set": set,
+				})
 			}
-			update = append(update, map[string]interface{}{
-				"$set": set,
-			})
 			//save
 			SaveMgoCache <- tmp
 			lock.Lock()
@@ -255,74 +289,102 @@ func FileDataMove(query map[string]interface{}) {
 			_id := tmp["_id"]
 			update := []map[string]interface{}{}
 			update = append(update, map[string]interface{}{"_id": _id})
-			NewIdLock.Lock()
-			newId := primitive.NewObjectID()
-			NewIdLock.Unlock()
-			tmp["_id"] = newId //新id
-			tmp["pre_id"] = mongodb.BsonIdToSId(_id)
-			strNewId := newId.Hex()
-			if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
-				tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strNewId) + `.html`
+			site := qu.ObjToString(tmp["site"])
+			s_sha := qu.ObjToString(tmp["s_sha"])
+			var repeat bool
+			if qu.IntAll(tmp["extracttype"]) == -1 { //站内判重
+				repeat = CheckStr("sha", site+s_sha)
 			}
-			//更新
-			set := map[string]interface{}{"moveok": true}
-			set["biddingid"] = strNewId
-			site := qu.ObjToString(tmp["site"]) //解析附件站点
-			var replace bool
-			var filetext string
-			if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
-				replace, filetext = AnalysisFile(true, limitRatio, tmp)
-				if replace { //替换正文
-					tmp["detail"] = filetext
-					set["filetext"] = true
+			if repeat { //重复数据不进行迁移
+				update = append(update, map[string]interface{}{
+					"$set": map[string]interface{}{
+						"moveok": false,
+					},
+				})
+				SaveRepeatDbCache <- tmp
+			} else {
+				NewIdLock.Lock()
+				newId := primitive.NewObjectID()
+				NewIdLock.Unlock()
+				tmp["_id"] = newId //新id
+				tmp["pre_id"] = mongodb.BsonIdToSId(_id)
+				strNewId := newId.Hex()
+				AddStr("sha", site+s_sha, strNewId)                                                             //db存储
+				if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
+					tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strNewId) + `.html`
 				}
-			} else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
-				//判断detail是否有效
-				detail := qu.ObjToString(tmp["detail"])
-				detail = sp.FilterDetail(detail) //只保留文本内容
-				if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
-					replace, filetext = AnalysisFile(false, 0, tmp)
+				//更新
+				set := map[string]interface{}{
+					"moveok":    true,
+					"biddingid": strNewId,
+				}
+				var replace bool
+				var filetext string
+				if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
+					replace, filetext = AnalysisFile(true, limitRatio, tmp)
 					if replace { //替换正文
 						tmp["detail"] = filetext
 						set["filetext"] = true
 					}
+				} else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
+					//判断detail是否有效
+					detail := qu.ObjToString(tmp["detail"])
+					detail = sp.FilterDetail(detail) //只保留文本内容
+					if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
+						replace, filetext = AnalysisFile(false, 0, tmp)
+						if replace { //替换正文
+							tmp["detail"] = filetext
+							set["filetext"] = true
+						}
+					}
 				}
-			}
-			//未用附件内容替换detail的数据,补充html标签,更新记录
-			if !replace {
-				detail := qu.ObjToString(tmp["detail"])
-				result, ok, _ := repairHtml(detail)
-				if ok && result != "" {
-					set["detail_update"] = result
-					tmp["detail"] = result
+				//未用附件内容替换detail的数据,补充html标签,更新记录
+				if !replace {
+					detail := qu.ObjToString(tmp["detail"])
+					result, ok, _ := repairHtml(detail)
+					if ok && result != "" {
+						set["detail_update"] = result
+						tmp["detail"] = result
+					}
 				}
+				//oss存储
+				objectName := fmt.Sprintf("%s%s", strNewId, ".txt")
+				detailStream := []byte(qu.ObjToString(tmp["detail"]))
+				detailArgs := &ossutil.UploadArgs{
+					BucketID:   Detail_BucketId,
+					ObjectName: objectName,
+					Stream:     detailStream,
+				}
+				rep := ossutil.UpLoadByRestful(Oss_Address, detailArgs)
+				if rep.Error_msg != ossutil.UploadSuccess {
+					qu.Debug("正文上传失败:", _id, strNewId)
+				}
+
+				htmlStream := []byte(qu.ObjToString(tmp["contenthtml"]))
+				contenthtmlArgs := &ossutil.UploadArgs{
+					BucketID:   Html_BucketId,
+					ObjectName: objectName,
+					Stream:     htmlStream,
+				}
+				rep = ossutil.UpLoadByRestful(Oss_Address, contenthtmlArgs)
+				if rep.Error_msg != ossutil.UploadSuccess {
+					qu.Debug("源码上传失败:", _id, strNewId)
+				}
+				//IsReplaceDetailSite := OssSite[site]
+				//if IsReplaceDetailSite {
+				//	replace, fileOk, filetext := AnalysisFile_back(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
+				//	if replace {
+				//		tmp["detail"] = filetext //替换正文
+				//	}
+				//	if !fileOk { //附件异常
+				//		set["filerr"] = true
+				//		//set["filetext"] = filetext//文本过大,导致更新mongo失败
+				//	}
+				//}
+				update = append(update, map[string]interface{}{
+					"$set": set,
+				})
 			}
-			//oss存储
-			objectName := fmt.Sprintf("%s%s", strNewId, ".txt")
-			detailStream := []byte(qu.ObjToString(tmp["detail"]))
-			rep := ossutil.UpLoadByRestful(Oss_Address, Detail_BucketId, objectName, detailStream, true)
-			if rep.Error_msg != UploadSuccess {
-				qu.Debug("正文上传失败:", _id, strNewId)
-			}
-			htmlStream := []byte(qu.ObjToString(tmp["contenthtml"]))
-			rep = ossutil.UpLoadByRestful(Oss_Address, Html_BucketId, objectName, htmlStream, true)
-			if rep.Error_msg != UploadSuccess {
-				qu.Debug("源码上传失败:", _id, strNewId)
-			}
-			//IsReplaceDetailSite := OssSite[site]
-			//if IsReplaceDetailSite {
-			//	replace, fileOk, filetext := AnalysisFile_back(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
-			//	if replace {
-			//		tmp["detail"] = filetext //替换正文
-			//	}
-			//	if !fileOk { //附件异常
-			//		set["filerr"] = true
-			//		//set["filetext"] = filetext//文本过大,导致更新mongo失败
-			//	}
-			//}
-			update = append(update, map[string]interface{}{
-				"$set": set,
-			})
 			//save
 			SaveMgoCache <- tmp
 			lock.Lock()
@@ -369,7 +431,7 @@ func SaveData() {
 				savearr = make([]map[string]interface{}, 20)
 				indexh = 0
 			}
-		case <-time.After(5 * time.Second): //超过10秒开始保存
+		case <-time.After(5 * time.Second): //超过5秒开始保存
 			qu.Debug("定时保存数据...", indexh)
 			if indexh > 0 {
 				SP <- true
@@ -386,17 +448,126 @@ func SaveData() {
 	}
 }
 
+func SaveRepeatData() {
+	fmt.Println("Save Repeat Data...")
+	repeatArr := make([]map[string]interface{}, 20)
+	repeatIndexh := 0
+	for {
+		select {
+		case v := <-SaveRepeatDbCache:
+			repeatArr[repeatIndexh] = v
+			repeatIndexh++
+			if repeatIndexh == 20 { //超过20条开始保存
+				SP_Repeat <- true
+				go func(repeatArr []map[string]interface{}) {
+					defer func() {
+						<-SP_Repeat
+					}()
+					Mgo.SaveBulk("bidding_db_repeat", repeatArr...)
+				}(repeatArr)
+				repeatArr = make([]map[string]interface{}, 20)
+				repeatIndexh = 0
+			}
+		case <-time.After(30 * time.Second): //超过30秒开始保存
+			qu.Debug("定时保存数据...", repeatIndexh)
+			if repeatIndexh > 0 {
+				SP_Repeat <- true
+				go func(repeatArr []map[string]interface{}) {
+					defer func() {
+						<-SP_Repeat
+					}()
+					Mgo.SaveBulk("bidding_db_repeat", repeatArr...)
+				}(repeatArr[:repeatIndexh])
+				repeatArr = make([]map[string]interface{}, 20)
+				repeatIndexh = 0
+			}
+		}
+	}
+}
 func DeleteData() {
+	endTime := time.Now().AddDate(0, -1, 0).Unix()
 	query := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$lt": time.Now().AddDate(0, -1, 0).Unix(),
+			"$lt": endTime,
+		},
+		"moveok": map[string]interface{}{
+			"$exists": true,
 		},
-		"moveok": true,
 	}
+	//删除db
+	DeleteBiddingNomalDB(query)
+	DeleteBiddingFileDB(query)
+	//删除表数据
 	n := Mgo.Delete(MoveCollFile, query)
 	n1 := Mgo.Delete(MoveCollNomal, query)
 	qu.Debug("bidding_file删除数据量:", n, "	bidding_nomal删除数据量:", n1)
 }
+func DeleteBiddingNomalDB(query map[string]interface{}) {
+	sess := Mgo.GetMgoConn()
+	defer Mgo.DestoryMongoConn(sess)
+	fields := map[string]interface{}{
+		"site":      1,
+		"s_sha":     1,
+		"biddingid": 1,
+	}
+	ch := make(chan bool, 10)
+	wg := sync.WaitGroup{}
+	it := sess.DB("qfw").C(MoveCollNomal).Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			s_sha := qu.ObjToString(tmp["s_sha"])
+			biddingid := qu.ObjToString(tmp["biddingid"])
+			RemoveStr("sha", site+s_sha, biddingid)
+		}(tmp)
+		if n%100 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	qu.Debug("bidding_nomal DB 已删除")
+}
+func DeleteBiddingFileDB(query map[string]interface{}) {
+	sess := Mgo.GetMgoConn()
+	defer Mgo.DestoryMongoConn(sess)
+	fields := map[string]interface{}{
+		"site":      1,
+		"s_sha":     1,
+		"biddingid": 1,
+	}
+	ch := make(chan bool, 10)
+	wg := sync.WaitGroup{}
+	it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			s_sha := qu.ObjToString(tmp["s_sha"])
+			biddingid := qu.ObjToString(tmp["biddingid"])
+			RemoveStr("sha", site+s_sha, biddingid)
+		}(tmp)
+		if n%100 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	qu.Debug("bidding_file DB 已删除")
+}
 
 /*func DataMoveToBidding() {
 	defer qu.Catch()

+ 0 - 270
src/ossutil.go

@@ -1,270 +0,0 @@
-package main
-
-import (
-	"bytes"
-	"encoding/json"
-	"errors"
-	"io"
-	"log"
-	"mime/multipart"
-	"net/http"
-	"net/rpc"
-	"strconv"
-)
-
-/*
-	http请求地址:172.17.162.27:18011
-	rpc请求地址:172.17.162.27:18012
-*/
-
-// RPC相关结构体
-type UploadArgs struct {
-	Stream     []byte // 客户端将文件数据传递过来
-	Gzip       bool   //是否压缩
-	BucketID   string //桶id
-	ObjectName string //对象名称
-}
-
-type Args struct {
-	BucketID   string //桶id
-	ObjectName string //对象名称
-}
-
-// 接口统一返回值
-type Result struct {
-	Error_code int         `json:"error_code"`
-	Error_msg  string      `json:"error_msg"`
-	Data       interface{} `json:"data"`
-}
-
-const (
-	UploadUrl           = "/ossservice/upload"
-	DownloadUrl         = "/ossservice/download"
-	DeleteUrl           = "/ossservice/delete"
-	GetBidDetailUrl     = "/ossservice/biddetail"
-	UploadSuccess       = "上传成功"
-	DownloadSuccess     = "下载成功"
-	DeleteSuccess       = "删除成功"
-	GetBidDetailSuccess = "获取正文成功"
-	UploadFail          = "上传失败:%v"
-	DownloadFail        = "下载失败:%v"
-	DeleteFail          = "删除失败:%v"
-	BidDetailFail       = "获取正文失败:%v"
-)
-
-/* restful方式上传
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @param stream 文件流
- * @param gzip 是否压缩
- * @return {"error_code":0,"error_msg":"上传成功"}
- */
-func UpLoadByRestful(domain, bucketId, objectName string, stream []byte, gzip bool) (reply *Result) {
-	reply = &Result{Error_code: -1}
-
-	// 创建一个缓冲区来存储表单数据
-	body := &bytes.Buffer{}
-	writer := multipart.NewWriter(body)
-	writer.WriteField("bucket_id", bucketId)
-	writer.WriteField("object_name", objectName)
-	writer.WriteField("gzip", strconv.FormatBool(gzip))
-
-	// 创建表单字段
-	part, err := writer.CreateFormFile("file", objectName)
-	if err != nil {
-		reply.Error_msg = err.Error()
-		return
-	}
-
-	// 模拟文件流
-	fileStream := bytes.NewReader(stream)
-
-	// 将文件流复制到表单字段
-	_, err = io.Copy(part, fileStream)
-	if err != nil {
-		reply.Error_msg = err.Error()
-		return
-	}
-
-	// 创建 HTTP 请求
-	if respBody, err := post(domain+UploadUrl, writer, body); err != nil {
-		reply.Error_msg = err.Error()
-	} else {
-		json.Unmarshal(respBody, &reply)
-	}
-	return
-}
-
-/* restful方式下载
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @return {"error_code":0,"error_msg":"下载成功"}
- */
-func DownloadByRestful(domain, bucketId, objectName string) (reply *Result) {
-	reply = &Result{}
-	// 创建一个缓冲区来存储表单数据
-	body := &bytes.Buffer{}
-	writer := multipart.NewWriter(body)
-	writer.WriteField("bucket_id", bucketId)
-	writer.WriteField("object_name", objectName)
-	if respBody, err := post(domain+DownloadUrl, writer, body); err != nil {
-		reply.Error_msg = err.Error()
-	} else {
-		reply.Error_msg = DownloadSuccess
-		reply.Data = respBody
-	}
-	return
-}
-
-/* restful方式删除
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @return {"error_code":0,"error_msg":"上传成功"}
- */
-func DeleteByRestful(domain, bucketId, objectName string) (reply *Result) {
-	reply = &Result{}
-	// 创建一个缓冲区来存储表单数据
-	body := &bytes.Buffer{}
-	writer := multipart.NewWriter(body)
-	writer.WriteField("bucket_id", bucketId)
-	writer.WriteField("object_name", objectName)
-	if respBody, err := post(domain+DeleteUrl, writer, body); err != nil {
-		reply.Error_msg = err.Error()
-	} else {
-		json.Unmarshal(respBody, &reply)
-	}
-	return
-}
-
-/* restful方式获取标讯正文
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @return {"error_code":0,"error_msg":"获取正文成功","data":"正文内容"}
- */
-func GetBidDetailByRestful(domain, bucketId, objectName string) (reply *Result) {
-	reply = &Result{}
-	// 创建一个缓冲区来存储表单数据
-	body := &bytes.Buffer{}
-	writer := multipart.NewWriter(body)
-	writer.WriteField("bucket_id", bucketId)
-	writer.WriteField("object_name", objectName)
-	if respBody, err := post(domain+GetBidDetailUrl, writer, body); err != nil {
-		reply.Error_msg = err.Error()
-	} else {
-		reply.Error_msg = GetBidDetailSuccess
-		reply.Data = string(respBody)
-	}
-	return
-}
-
-func post(url string, writer *multipart.Writer, body *bytes.Buffer) ([]byte, error) {
-	// 关闭表单写入器
-	if err := writer.Close(); err != nil {
-		return nil, err
-	}
-	// 创建 HTTP 请求
-	req, err := http.NewRequest("POST", url, body)
-	if err != nil {
-		log.Println("Error creating request:", err)
-		return nil, err
-	}
-
-	// 设置请求头
-	req.Header.Set("Content-Type", writer.FormDataContentType())
-
-	// 发送请求
-	client := &http.Client{}
-	resp, err := client.Do(req)
-	if err != nil {
-		return nil, err
-	}
-	defer resp.Body.Close()
-
-	// 读取响应
-	respBody, err := io.ReadAll(resp.Body)
-	if err != nil {
-		return nil, err
-	}
-	if resp.StatusCode != http.StatusOK {
-		return nil, errors.New(string(respBody))
-	}
-	return respBody, nil
-}
-
-/* rpc方式上传
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @param args.Stream 文件流
- * @param args.Gzip 是否压缩
- * @return {"error_code":0,"error_msg":"上传成功"}
- * @return error 错误信息
- */
-func UpLoadByRpc(address string, args *UploadArgs) (Result, error) {
-	var reply Result
-	err := rpcCall(address, "OSSService.Upload", args, &reply)
-	return reply, err
-}
-
-/*
- *rpc方式下载
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"下载成功","data":"文件流"}
- * @return error 错误信息
- */
-func DownloadByRpc(address string, args *Args) (Result, error) {
-	var reply Result
-	err := rpcCall(address, "OSSService.Download", args, &reply)
-	return reply, err
-}
-
-/* rpc方式删除
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"删除成功"}
- * @return error 错误信息
- */
-func DeleteByRpc(address string, args *Args) (Result, error) {
-	var reply Result
-	err := rpcCall(address, "OSSService.Delete", args, &reply)
-	return reply, err
-}
-
-/*
- *rpc方式获取标讯正文
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"下载成功","data":"正文内容"}
- * @return error 错误信息
- */
-func GetBidDetailByRpc(address string, args *Args) (Result, error) {
-	var reply Result
-	err := rpcCall(address, "OSSService.GetBidDetail", args, &reply)
-	return reply, err
-}
-func rpcCall(address, serviceMethod string, args any, reply any) error {
-	client, err := rpc.DialHTTP("tcp", address)
-	if err != nil {
-		log.Println(err)
-		return err
-	}
-	defer client.Close()
-	err = client.Call(serviceMethod, args, reply)
-	if err != nil {
-		log.Println(err)
-		return err
-	}
-	return nil
-}