main.go 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package main
  2. import (
  3. "github.com/gogf/gf/v2/frame/g"
  4. "github.com/gogf/gf/v2/os/gctx"
  5. "github.com/gogf/gf/v2/os/gfsnotify"
  6. "google.golang.org/grpc"
  7. "gopkg.in/natefinch/lumberjack.v2"
  8. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  9. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  10. ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
  11. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  12. "log"
  13. "net"
  14. "net/http"
  15. "net/rpc"
  16. "time"
  17. )
  18. func main() {
  19. ctx := gctx.New()
  20. var logger *lumberjack.Logger
  21. g.Config().MustGet(ctx, "logger").Struct(&logger)
  22. log.SetOutput(logger)
  23. // 初始化OSS帐号与bucket信息
  24. ossService.LoadOSSAccounts()
  25. // 注册一个回调函数,当配置发生变更时会被调用
  26. downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()
  27. getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
  28. getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
  29. gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
  30. log.Println(event.String())
  31. if event.IsWrite() || event.IsChmod() || event.IsRename() {
  32. log.Println("配置文件有变化,更新内存。。。")
  33. ossService.LoadOSSAccounts()
  34. if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize {
  35. downloadPoolSize = poolSize
  36. ossService.DownloadPool = make(chan bool, downloadPoolSize)
  37. }
  38. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
  39. getDetailFromEsPoolSize = poolSize
  40. ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
  41. }
  42. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
  43. getDetailFromMgoPoolSize = poolSize
  44. ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
  45. }
  46. }
  47. })
  48. // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
  49. go func() {
  50. ticker := time.NewTicker(5 * time.Second)
  51. var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
  52. ctx := gctx.New()
  53. for range ticker.C {
  54. util.SendHeartbeat(ctx)
  55. util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
  56. util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn)
  57. util.CheckUploadQueue(ctx, &uploadQueuePrevWarn)
  58. util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
  59. }
  60. }()
  61. go func() {
  62. //创建一个grpc 服务器
  63. s := grpc.NewServer()
  64. //注册事件
  65. pb.RegisterServiceServer(s, &ossService.Grpc{})
  66. grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String()
  67. //处理链接
  68. listen, err := net.Listen("tcp", grpcPort)
  69. if err != nil {
  70. log.Println(err)
  71. } else {
  72. log.Println("grpc server is listening", grpcPort)
  73. }
  74. s.Serve(listen)
  75. }()
  76. // 启动RPC服务:注册OSSService,实现接口调用
  77. rpcService := new(ossService.OSSService)
  78. rpc.Register(rpcService)
  79. rpc.HandleHTTP()
  80. http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
  81. http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
  82. http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
  83. http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
  84. http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
  85. port := g.Config().MustGet(ctx, "port").String()
  86. log.Println("HTTP server started on " + port)
  87. if err := http.ListenAndServe(port, nil); err != nil {
  88. //if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
  89. log.Fatalln("HTTP server error", err)
  90. }
  91. }