main.go 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package main
  2. import (
  3. "app.yhyue.com/moapp/jybase/endless"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/gfsnotify"
  7. "google.golang.org/grpc"
  8. "gopkg.in/natefinch/lumberjack.v2"
  9. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  10. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  11. ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  13. "log"
  14. "net"
  15. "net/http"
  16. "net/rpc"
  17. "time"
  18. )
  19. func main() {
  20. ctx := gctx.New()
  21. var logger *lumberjack.Logger
  22. g.Config().MustGet(ctx, "logger").Struct(&logger)
  23. log.SetOutput(logger)
  24. // 初始化OSS帐号与bucket信息
  25. ossService.LoadOSSAccounts()
  26. // 注册一个回调函数,当配置发生变更时会被调用
  27. downLoadPoolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int()
  28. getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
  29. getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
  30. gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
  31. log.Println(event.String())
  32. if event.IsWrite() || event.IsChmod() || event.IsRename() {
  33. log.Println("配置文件有变化,更新内存。。。")
  34. ossService.LoadOSSAccounts()
  35. if poolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int(); downLoadPoolSize != poolSize {
  36. downLoadPoolSize = poolSize
  37. ossService.DownLoadPool = make(chan bool, downLoadPoolSize)
  38. }
  39. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
  40. getDetailFromEsPoolSize = poolSize
  41. ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
  42. }
  43. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
  44. getDetailFromMgoPoolSize = poolSize
  45. ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
  46. }
  47. }
  48. })
  49. // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
  50. go func() {
  51. ticker := time.NewTicker(5 * time.Second)
  52. var onlineNodesPrevWarn, downLoadQueuePrevWarn int64
  53. ctx := gctx.New()
  54. for range ticker.C {
  55. util.SendHeartbeat(ctx)
  56. util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
  57. util.CheckDownLoadQueue(ctx, &downLoadQueuePrevWarn)
  58. }
  59. }()
  60. go func() {
  61. //创建一个grpc 服务器
  62. s := grpc.NewServer()
  63. //注册事件
  64. pb.RegisterServiceServer(s, &ossService.Grpc{})
  65. grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String()
  66. //处理链接
  67. listen, err := net.Listen("tcp", grpcPort)
  68. if err != nil {
  69. log.Println(err)
  70. } else {
  71. log.Println("grpc server is listening", grpcPort)
  72. }
  73. s.Serve(listen)
  74. }()
  75. // 启动RPC服务:注册OSSService,实现接口调用
  76. rpcService := new(ossService.OSSService)
  77. rpc.Register(rpcService)
  78. rpc.HandleHTTP()
  79. http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
  80. http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
  81. http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
  82. http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
  83. http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
  84. port := g.Config().MustGet(ctx, "port").String()
  85. log.Println("HTTP server started on " + port)
  86. if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
  87. log.Fatalln("HTTP server error", err)
  88. }
  89. }