Răsfoiți Sursa

feat:xiugai

wangchuanjin 1 lună în urmă
părinte
comite
6679df3cc7
6 a modificat fișierele cu 105 adăugiri și 4 ștergeri
  1. 7 0
      README.md
  2. 3 1
      config.yaml
  3. 1 1
      go.mod
  4. 2 2
      go.sum
  5. 78 0
      main.go
  6. 14 0
      oss/grpc.go

+ 7 - 0
README.md

@@ -0,0 +1,7 @@
+restful请求地址:172.17.162.27:18011
+rpc请求地址:172.17.162.27:18012
+grpc请求地址:172.17.162.27:18112
+
+
+172.17.162.29:8012
+172.17.162.37:8012

+ 3 - 1
config.yaml

@@ -70,4 +70,6 @@ getDetailLineUpWarnSize: 10
 getDetailOrder:  #mgo es oss
   - es
   - mgo
-  - oss
+  - oss
+retry: 20
+retrySleep: 3

+ 1 - 1
go.mod

@@ -12,7 +12,7 @@ require (
 	github.com/gogf/gf/v2 v2.7.0
 	google.golang.org/grpc v1.64.0
 	gopkg.in/natefinch/lumberjack.v2 v2.2.1
-	jygit.jydev.jianyu360.cn/BaseService/ossClient v0.0.0-20250617032421-365fe18d2c8c
+	jygit.jydev.jianyu360.cn/BaseService/ossClient v0.0.0-20250620021406-873850cb5724
 )
 
 require (

+ 2 - 2
go.sum

@@ -731,8 +731,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
-jygit.jydev.jianyu360.cn/BaseService/ossClient v0.0.0-20250617032421-365fe18d2c8c h1:0gJtsJipNfH6FvR4LoblL2tfjDjo2YuKYsaS/ZPe7YY=
-jygit.jydev.jianyu360.cn/BaseService/ossClient v0.0.0-20250617032421-365fe18d2c8c/go.mod h1:GamMXV9LlDWGyQQrHVojZUOJqZpxa5cuEA9W5D+PGG8=
+jygit.jydev.jianyu360.cn/BaseService/ossClient v0.0.0-20250620021406-873850cb5724 h1:CIHZZjixY1XV0J+SE2AhiaGrbb/aauO/4EWogAQTSPU=
+jygit.jydev.jianyu360.cn/BaseService/ossClient v0.0.0-20250620021406-873850cb5724/go.mod h1:GamMXV9LlDWGyQQrHVojZUOJqZpxa5cuEA9W5D+PGG8=
 k8s.io/api v0.26.3 h1:emf74GIQMTik01Aum9dPP0gAypL8JTLl/lHa4V9RFSU=
 k8s.io/api v0.26.3/go.mod h1:PXsqwPMXBSBcL1lJ9CYDKy7kIReUydukS5JiRlxC3qE=
 k8s.io/apimachinery v0.27.0-alpha.3 h1:uujqsdFrbqF+cEbqFHrkLKp+s3XxRgphTpc6Yg84qLo=

+ 78 - 0
main.go

@@ -1,6 +1,8 @@
 package main
 
 import (
+	. "app.yhyue.com/moapp/jybase/mongodb"
+	"flag"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gctx"
 	"github.com/gogf/gf/v2/os/gfsnotify"
@@ -8,12 +10,15 @@ import (
 	"gopkg.in/natefinch/lumberjack.v2"
 	"jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
 	"jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
+	"jygit.jydev.jianyu360.cn/BaseService/ossService/config"
 	ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
 	"jygit.jydev.jianyu360.cn/BaseService/ossService/util"
 	"log"
 	"net"
 	"net/http"
 	"net/rpc"
+	"strings"
+	"sync"
 	"time"
 )
 
@@ -92,3 +97,76 @@ func main() {
 		log.Fatalln("HTTP server error", err)
 	}
 }
+
+// /////////////////
+func main11() {
+	start := flag.Int64("s", 0, "")
+	end := flag.Int64("e", 0, "")
+	poolSize := flag.Int("p", 5, "")
+	flag.Parse()
+	// 初始化OSS帐号与bucket信息
+	ossService.LoadOSSAccounts()
+	sess := config.Mgo.GetMgoConn()
+	defer config.Mgo.DestoryMongoConn(sess)
+	query := map[string]interface{}{}
+	idQuery := map[string]interface{}{}
+	if *start != 0 && *start == *end {
+		query["comeintime"] = *start
+	} else {
+		if *start != 0 {
+			idQuery["$gte"] = *start
+		}
+		if *end != 0 {
+			idQuery["$lte"] = *end
+		}
+	}
+	if len(idQuery) > 0 {
+		query["comeintime"] = idQuery
+	}
+	log.Println("start。。。", query)
+	it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{
+		"_id":         1,
+		"detail":      1,
+		"contenthtml": 1,
+		"comeintime":  1,
+	}).Sort("-comeintime").Iter()
+	pool := make(chan bool, *poolSize)
+	wait := &sync.WaitGroup{}
+	index := 0
+	for m := make(map[string]interface{}); it.Next(&m); {
+		pool <- true
+		wait.Add(1)
+		index++
+		if index%5000 == 0 {
+			log.Println(index, m["_id"], m["comeintime"])
+		}
+		go func(mm map[string]interface{}) {
+			defer func() {
+				<-pool
+				wait.Done()
+			}()
+			objectName := BsonIdToSId(mm["_id"]) + ".txt"
+			detail, _ := mm["detail"].(string)
+			for {
+				if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil {
+					log.Println("detail", objectName, err1)
+					time.Sleep(3 * time.Second)
+				} else {
+					break
+				}
+			}
+			contenthtml, _ := mm["contenthtml"].(string)
+			for {
+				if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil {
+					log.Println("contenthtml", objectName, err2)
+					time.Sleep(3 * time.Second)
+				} else {
+					break
+				}
+			}
+		}(m)
+		m = make(map[string]interface{})
+	}
+	wait.Wait()
+	log.Println("over。。。", index)
+}

+ 14 - 0
oss/grpc.go

@@ -1,6 +1,8 @@
 package oss
 
 import (
+	"bytes"
+	"context"
 	"fmt"
 	"jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
 	"jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
@@ -40,3 +42,15 @@ func (g *Grpc) GetBidDetail(req *pb.DownloadRequest, resp pb.Service_DownloadSer
 	GetBidDetail(&myWriter{resp: resp}, req.BucketID, req.ObjectName)
 	return nil
 }
+
+func (g *Grpc) Upload(ctx context.Context, req *pb.UploadRequest) (*pb.Response, error) {
+	if err := checkArgs(&entity.Args{BucketID: req.BucketID, ObjectName: req.ObjectName}); err != nil {
+		log.Println("grpc方式", fmt.Sprintf(constant.UploadFail, err))
+		return nil, err
+	}
+	err := Upload(req.BucketID, req.ObjectName, bytes.NewReader(req.Stream), req.Gzip)
+	if err != nil {
+		return nil, err
+	}
+	return &pb.Response{ErrorMsg: constant.UploadSuccess}, nil
+}