package main import ( . "app.yhyue.com/moapp/jybase/mongodb" "flag" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gfsnotify" "google.golang.org/grpc" "gopkg.in/natefinch/lumberjack.v2" "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant" "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb" "jygit.jydev.jianyu360.cn/BaseService/ossService/config" ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss" "jygit.jydev.jianyu360.cn/BaseService/ossService/util" "log" "net" "net/http" "net/rpc" "strings" "sync" "time" ) func main() { ctx := gctx.New() var logger *lumberjack.Logger g.Config().MustGet(ctx, "logger").Struct(&logger) log.SetOutput(logger) // 初始化OSS帐号与bucket信息 ossService.LoadOSSAccounts() // 注册一个回调函数,当配置发生变更时会被调用 downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int() getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int() getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int() gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) { log.Println(event.String()) if event.IsWrite() || event.IsChmod() || event.IsRename() { log.Println("配置文件有变化,更新内存。。。") ossService.LoadOSSAccounts() if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize { downloadPoolSize = poolSize ossService.DownloadPool = make(chan bool, downloadPoolSize) } if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize { getDetailFromEsPoolSize = poolSize ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize) } if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize { getDetailFromMgoPoolSize = poolSize ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize) } } }) // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量 go func() { ticker := time.NewTicker(5 * time.Second) var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64 ctx := gctx.New() for range ticker.C { util.SendHeartbeat(ctx) util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn) util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn) util.CheckUploadQueue(ctx, &uploadQueuePrevWarn) util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn) } }() go func() { //创建一个grpc 服务器 s := grpc.NewServer() //注册事件 pb.RegisterServiceServer(s, &ossService.Grpc{}) grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String() //处理链接 listen, err := net.Listen("tcp", grpcPort) if err != nil { log.Println(err) } else { log.Println("grpc server is listening", grpcPort) } s.Serve(listen) }() // 启动RPC服务:注册OSSService,实现接口调用 rpcService := new(ossService.OSSService) rpc.Register(rpcService) rpc.HandleHTTP() http.HandleFunc(constant.UploadUrl, ossService.UploadHandler) http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler) http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler) http.HandleFunc("/ossservice/nodes", ossService.NodesHandler) http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler) port := g.Config().MustGet(ctx, "port").String() log.Println("HTTP server started on " + port) if err := http.ListenAndServe(port, nil); err != nil { //if err := endless.ListenAndServe(port, nil, func() {}); err != nil { log.Fatalln("HTTP server error", err) } } // ///////////////// func main11() { start := flag.Int64("s", 0, "") end := flag.Int64("e", 0, "") poolSize := flag.Int("p", 5, "") flag.Parse() // 初始化OSS帐号与bucket信息 ossService.LoadOSSAccounts() sess := config.Mgo.GetMgoConn() defer config.Mgo.DestoryMongoConn(sess) query := map[string]interface{}{} idQuery := map[string]interface{}{} if *start != 0 && *start == *end { query["comeintime"] = *start } else { if *start != 0 { idQuery["$gte"] = *start } if *end != 0 { idQuery["$lte"] = *end } } if len(idQuery) > 0 { query["comeintime"] = idQuery } log.Println("start。。。", query) it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{ "_id": 1, "detail": 1, "contenthtml": 1, "comeintime": 1, }).Sort("-comeintime").Iter() pool := make(chan bool, *poolSize) wait := &sync.WaitGroup{} index := 0 for m := make(map[string]interface{}); it.Next(&m); { pool <- true wait.Add(1) index++ if index%5000 == 0 { log.Println(index, m["_id"], m["comeintime"]) } go func(mm map[string]interface{}) { defer func() { <-pool wait.Done() }() objectName := BsonIdToSId(mm["_id"]) + ".txt" detail, _ := mm["detail"].(string) for { if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil { log.Println("detail", objectName, err1) time.Sleep(3 * time.Second) } else { break } } contenthtml, _ := mm["contenthtml"].(string) for { if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil { log.Println("contenthtml", objectName, err2) time.Sleep(3 * time.Second) } else { break } } }(m) m = make(map[string]interface{}) } wait.Wait() log.Println("over。。。", index) }