package main import ( "app.yhyue.com/moapp/jybase/endless" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gfsnotify" "google.golang.org/grpc" "gopkg.in/natefinch/lumberjack.v2" "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant" "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb" ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss" "jygit.jydev.jianyu360.cn/BaseService/ossService/util" "log" "net" "net/http" "net/rpc" "time" ) func main() { ctx := gctx.New() var logger *lumberjack.Logger g.Config().MustGet(ctx, "logger").Struct(&logger) log.SetOutput(logger) // 初始化OSS帐号与bucket信息 ossService.LoadOSSAccounts() // 注册一个回调函数,当配置发生变更时会被调用 downLoadPoolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int() getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int() getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int() gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) { log.Println(event.String()) if event.IsWrite() || event.IsChmod() || event.IsRename() { log.Println("配置文件有变化,更新内存。。。") ossService.LoadOSSAccounts() if poolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int(); downLoadPoolSize != poolSize { downLoadPoolSize = poolSize ossService.DownLoadPool = make(chan bool, downLoadPoolSize) } if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize { getDetailFromEsPoolSize = poolSize ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize) } if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize { getDetailFromMgoPoolSize = poolSize ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize) } } }) // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量 go func() { ticker := time.NewTicker(5 * time.Second) var onlineNodesPrevWarn, downLoadQueuePrevWarn int64 ctx := gctx.New() for range ticker.C { util.SendHeartbeat(ctx) util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn) util.CheckDownLoadQueue(ctx, &downLoadQueuePrevWarn) } }() go func() { //创建一个grpc 服务器 s := grpc.NewServer() //注册事件 pb.RegisterServiceServer(s, &ossService.Grpc{}) grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String() //处理链接 listen, err := net.Listen("tcp", grpcPort) if err != nil { log.Println(err) } else { log.Println("grpc server is listening", grpcPort) } s.Serve(listen) }() // 启动RPC服务:注册OSSService,实现接口调用 rpcService := new(ossService.OSSService) rpc.Register(rpcService) rpc.HandleHTTP() http.HandleFunc(constant.UploadUrl, ossService.UploadHandler) http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler) http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler) http.HandleFunc("/ossservice/nodes", ossService.NodesHandler) http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler) port := g.Config().MustGet(ctx, "port").String() log.Println("HTTP server started on " + port) if err := endless.ListenAndServe(port, nil, func() {}); err != nil { log.Fatalln("HTTP server error", err) } }