main.go 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package main
  2. import (
  3. "app.yhyue.com/moapp/jybase/endless"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/gfsnotify"
  7. "gopkg.in/natefinch/lumberjack.v2"
  8. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  9. ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
  10. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  11. "log"
  12. "net/http"
  13. "net/rpc"
  14. "time"
  15. )
  16. func main() {
  17. ctx := gctx.New()
  18. var logger *lumberjack.Logger
  19. g.Config().MustGet(ctx, "logger").Struct(&logger)
  20. log.SetOutput(logger)
  21. // 初始化OSS帐号与bucket信息
  22. ossService.LoadOSSAccounts()
  23. // 注册一个回调函数,当配置发生变更时会被调用
  24. downLoadPoolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int()
  25. getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
  26. getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
  27. gfsnotify.Add("./config.json", func(event *gfsnotify.Event) {
  28. log.Println(event.String())
  29. if event.IsWrite() || event.IsChmod() || event.IsRename() {
  30. log.Println("配置文件有变化,更新内存。。。")
  31. ossService.LoadOSSAccounts()
  32. if poolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int(); downLoadPoolSize != poolSize {
  33. downLoadPoolSize = poolSize
  34. ossService.DownLoadPool = make(chan bool, downLoadPoolSize)
  35. }
  36. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
  37. getDetailFromEsPoolSize = poolSize
  38. ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
  39. }
  40. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
  41. getDetailFromMgoPoolSize = poolSize
  42. ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
  43. }
  44. }
  45. })
  46. // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
  47. go func() {
  48. ticker := time.NewTicker(5 * time.Second)
  49. var onlineNodesPrevWarn, downLoadQueuePrevWarn int64
  50. ctx := gctx.New()
  51. for range ticker.C {
  52. util.SendHeartbeat(ctx)
  53. util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
  54. util.CheckDownLoadQueue(ctx, &downLoadQueuePrevWarn)
  55. }
  56. }()
  57. // 启动RPC服务:注册OSSService,实现接口调用
  58. rpcService := new(ossService.OSSService)
  59. rpc.Register(rpcService)
  60. rpc.HandleHTTP()
  61. http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
  62. http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
  63. http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
  64. http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
  65. http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
  66. port := g.Config().MustGet(ctx, "port").String()
  67. log.Println("HTTP server started on " + port)
  68. if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
  69. log.Fatalln("HTTP server error", err)
  70. }
  71. }