main.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package main
  2. import (
  3. . "app.yhyue.com/moapp/jybase/mongodb"
  4. "flag"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "github.com/gogf/gf/v2/os/gfsnotify"
  8. "google.golang.org/grpc"
  9. "gopkg.in/natefinch/lumberjack.v2"
  10. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  11. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  13. ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
  14. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  15. "log"
  16. "net"
  17. "net/http"
  18. "net/rpc"
  19. "strings"
  20. "sync"
  21. "time"
  22. )
  23. func main() {
  24. ctx := gctx.New()
  25. var logger *lumberjack.Logger
  26. g.Config().MustGet(ctx, "logger").Struct(&logger)
  27. log.SetOutput(logger)
  28. // 初始化OSS帐号与bucket信息
  29. ossService.LoadOSSAccounts()
  30. // 注册一个回调函数,当配置发生变更时会被调用
  31. downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()
  32. getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
  33. getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
  34. gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
  35. log.Println(event.String())
  36. if event.IsWrite() || event.IsChmod() || event.IsRename() {
  37. log.Println("配置文件有变化,更新内存。。。")
  38. ossService.LoadOSSAccounts()
  39. if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize {
  40. downloadPoolSize = poolSize
  41. ossService.DownloadPool = make(chan bool, downloadPoolSize)
  42. }
  43. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
  44. getDetailFromEsPoolSize = poolSize
  45. ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
  46. }
  47. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
  48. getDetailFromMgoPoolSize = poolSize
  49. ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
  50. }
  51. }
  52. })
  53. // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
  54. go func() {
  55. ticker := time.NewTicker(5 * time.Second)
  56. var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
  57. ctx := gctx.New()
  58. for range ticker.C {
  59. util.SendHeartbeat(ctx)
  60. util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
  61. util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn)
  62. util.CheckUploadQueue(ctx, &uploadQueuePrevWarn)
  63. util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
  64. }
  65. }()
  66. go func() {
  67. //创建一个grpc 服务器
  68. s := grpc.NewServer()
  69. //注册事件
  70. pb.RegisterServiceServer(s, &ossService.Grpc{})
  71. grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String()
  72. //处理链接
  73. listen, err := net.Listen("tcp", grpcPort)
  74. if err != nil {
  75. log.Println(err)
  76. } else {
  77. log.Println("grpc server is listening", grpcPort)
  78. }
  79. s.Serve(listen)
  80. }()
  81. // 启动RPC服务:注册OSSService,实现接口调用
  82. rpcService := new(ossService.OSSService)
  83. rpc.Register(rpcService)
  84. rpc.HandleHTTP()
  85. http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
  86. http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
  87. http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
  88. http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
  89. http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
  90. port := g.Config().MustGet(ctx, "port").String()
  91. log.Println("HTTP server started on " + port)
  92. if err := http.ListenAndServe(port, nil); err != nil {
  93. //if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
  94. log.Fatalln("HTTP server error", err)
  95. }
  96. }
  97. // /////////////////
  98. func main11() {
  99. start := flag.Int64("s", 0, "")
  100. end := flag.Int64("e", 0, "")
  101. poolSize := flag.Int("p", 5, "")
  102. flag.Parse()
  103. // 初始化OSS帐号与bucket信息
  104. ossService.LoadOSSAccounts()
  105. sess := config.Mgo.GetMgoConn()
  106. defer config.Mgo.DestoryMongoConn(sess)
  107. query := map[string]interface{}{}
  108. idQuery := map[string]interface{}{}
  109. if *start != 0 && *start == *end {
  110. query["comeintime"] = *start
  111. } else {
  112. if *start != 0 {
  113. idQuery["$gte"] = *start
  114. }
  115. if *end != 0 {
  116. idQuery["$lte"] = *end
  117. }
  118. }
  119. if len(idQuery) > 0 {
  120. query["comeintime"] = idQuery
  121. }
  122. log.Println("start。。。", query)
  123. it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{
  124. "_id": 1,
  125. "detail": 1,
  126. "contenthtml": 1,
  127. "comeintime": 1,
  128. }).Sort("-comeintime").Iter()
  129. pool := make(chan bool, *poolSize)
  130. wait := &sync.WaitGroup{}
  131. index := 0
  132. for m := make(map[string]interface{}); it.Next(&m); {
  133. pool <- true
  134. wait.Add(1)
  135. index++
  136. if index%5000 == 0 {
  137. log.Println(index, m["_id"], m["comeintime"])
  138. }
  139. go func(mm map[string]interface{}) {
  140. defer func() {
  141. <-pool
  142. wait.Done()
  143. }()
  144. objectName := BsonIdToSId(mm["_id"]) + ".txt"
  145. detail, _ := mm["detail"].(string)
  146. for {
  147. if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil {
  148. log.Println("detail", objectName, err1)
  149. time.Sleep(3 * time.Second)
  150. } else {
  151. break
  152. }
  153. }
  154. contenthtml, _ := mm["contenthtml"].(string)
  155. for {
  156. if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil {
  157. log.Println("contenthtml", objectName, err2)
  158. time.Sleep(3 * time.Second)
  159. } else {
  160. break
  161. }
  162. }
  163. }(m)
  164. m = make(map[string]interface{})
  165. }
  166. wait.Wait()
  167. log.Println("over。。。", index)
  168. }