package main import ( . "app.yhyue.com/moapp/jybase/mongodb" osr "app.yhyue.com/moapp/jybase/overseer" "flag" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gfsnotify" "google.golang.org/grpc" "gopkg.in/natefinch/lumberjack.v2" "io" "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant" "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb" "jygit.jydev.jianyu360.cn/BaseService/ossService/config" ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss" "jygit.jydev.jianyu360.cn/BaseService/ossService/util" "log" "net" "net/http" "net/rpc" "os" "strings" "sync" "time" ) const Version = "V1.3" func main() { ctx := gctx.New() osr.Run(osr.Config{ Addresses: []string{g.Config().MustGet(ctx, "port").String(), g.Config().MustGet(ctx, "grpcPort").String()}, // 多个监听地址 Program: program, }) } func program(state osr.State) { pid := os.Getpid() log.Println("ppid", os.Getppid(), "子进程", pid, "启动,监听端口", strings.Join(state.Addresses, " ")) ctx := gctx.New() var logger *lumberjack.Logger g.Config().MustGet(ctx, "logger").Struct(&logger) writers := []io.Writer{logger} if g.Config().MustGet(ctx, "logger.console").Bool() { writers = append(writers, os.Stdout) } log.SetOutput(io.MultiWriter(writers...)) config.InitDb() // 初始化OSS帐号与bucket信息 ossService.LoadOSSAccounts() // 注册一个回调函数,当配置发生变更时会被调用 downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int() getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int() getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int() gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) { log.Println(event.String()) if event.IsWrite() || event.IsChmod() || event.IsRename() { log.Println("配置文件有变化,更新内存。。。") ossService.LoadOSSAccounts() if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize { downloadPoolSize = poolSize ossService.DownloadPool = make(chan bool, downloadPoolSize) } if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize { getDetailFromEsPoolSize = poolSize ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize) } if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize { getDetailFromMgoPoolSize = poolSize ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize) } } }) // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量 go func() { ticker := time.NewTicker(5 * time.Second) var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64 for range ticker.C { util.SendHeartbeat(ctx) util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn) util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn) util.CheckUploadQueue(ctx, &uploadQueuePrevWarn) util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn) } }() // state.Listeners 与配置的 Addresses 顺序一致 servers := make([]osr.Server, len(state.Listeners)) // 启动RPC服务:注册OSSService,实现接口调用 rpcService := new(ossService.OSSService) rpc.Register(rpcService) rpc.HandleHTTP() mux := http.DefaultServeMux mux.HandleFunc(constant.UploadUrl, ossService.UploadHandler) mux.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler) mux.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler) mux.HandleFunc("/ossservice/nodes", ossService.NodesHandler) mux.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler) mux.HandleFunc("/ossservice/version", func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "版本号:", Version) }) servers[0] = &http.Server{Handler: mux} //创建一个grpc 服务器 grpcServer := grpc.NewServer() defer grpcServer.GracefulStop() //注册事件 pb.RegisterServiceServer(grpcServer, &ossService.Grpc{}) servers[1] = grpcServer // 启动所有服务(每个监听对应一个服务器) for i, listener := range state.Listeners { go func(srv osr.Server, l net.Listener) { log.Println("子进程", pid, "启动端口", l.Addr().String()) if err := srv.Serve(l); err != nil { log.Println("端口", l.Addr().String(), "服务启动失败", err) } }(servers[i], listener) } // 监听关闭信号,优雅关闭所有服务器 <-state.GracefulShutdown log.Println("子进程", pid, "停止运行。。。") } // ///////////////// func main11() { start := flag.Int64("s", 0, "") end := flag.Int64("e", 0, "") poolSize := flag.Int("p", 5, "") flag.Parse() // 初始化OSS帐号与bucket信息 ossService.LoadOSSAccounts() sess := config.Mgo.GetMgoConn() defer config.Mgo.DestoryMongoConn(sess) query := map[string]interface{}{} idQuery := map[string]interface{}{} if *start != 0 && *start == *end { query["comeintime"] = *start } else { if *start != 0 { idQuery["$gte"] = *start } if *end != 0 { idQuery["$lte"] = *end } } if len(idQuery) > 0 { query["comeintime"] = idQuery } log.Println("start。。。", query) it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{ "_id": 1, "detail": 1, "contenthtml": 1, "comeintime": 1, }).Sort("-comeintime").Iter() pool := make(chan bool, *poolSize) wait := &sync.WaitGroup{} index := 0 for m := make(map[string]interface{}); it.Next(&m); { pool <- true wait.Add(1) index++ if index%5000 == 0 { log.Println(index, m["_id"], m["comeintime"]) } go func(mm map[string]interface{}) { defer func() { <-pool wait.Done() }() objectName := BsonIdToSId(mm["_id"]) + ".txt" detail, _ := mm["detail"].(string) for { if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil { log.Println("detail", objectName, err1) time.Sleep(3 * time.Second) } else { break } } contenthtml, _ := mm["contenthtml"].(string) for { if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil { log.Println("contenthtml", objectName, err2) time.Sleep(3 * time.Second) } else { break } } }(m) m = make(map[string]interface{}) } wait.Wait() log.Println("over。。。", index) }