main.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package main
  2. import (
  3. . "app.yhyue.com/moapp/jybase/mongodb"
  4. osr "app.yhyue.com/moapp/jybase/overseer"
  5. "flag"
  6. "fmt"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/os/gctx"
  9. "github.com/gogf/gf/v2/os/gfsnotify"
  10. "google.golang.org/grpc"
  11. "gopkg.in/natefinch/lumberjack.v2"
  12. "io"
  13. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  14. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  15. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  16. ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
  17. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  18. "log"
  19. "net"
  20. "net/http"
  21. "net/rpc"
  22. "os"
  23. "strings"
  24. "sync"
  25. "time"
  26. )
  27. const Version = "V1.3"
  28. func main() {
  29. ctx := gctx.New()
  30. osr.Run(osr.Config{
  31. Addresses: []string{g.Config().MustGet(ctx, "port").String(), g.Config().MustGet(ctx, "grpcPort").String()}, // 多个监听地址
  32. Program: program,
  33. })
  34. }
  35. func program(state osr.State) {
  36. pid := os.Getpid()
  37. log.Println("ppid", os.Getppid(), "子进程", pid, "启动,监听端口", strings.Join(state.Addresses, " "))
  38. ctx := gctx.New()
  39. var logger *lumberjack.Logger
  40. g.Config().MustGet(ctx, "logger").Struct(&logger)
  41. writers := []io.Writer{logger}
  42. if g.Config().MustGet(ctx, "logger.console").Bool() {
  43. writers = append(writers, os.Stdout)
  44. }
  45. log.SetOutput(io.MultiWriter(writers...))
  46. config.InitDb()
  47. // 初始化OSS帐号与bucket信息
  48. ossService.LoadOSSAccounts()
  49. // 注册一个回调函数,当配置发生变更时会被调用
  50. downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()
  51. getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
  52. getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
  53. gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
  54. log.Println(event.String())
  55. if event.IsWrite() || event.IsChmod() || event.IsRename() {
  56. log.Println("配置文件有变化,更新内存。。。")
  57. ossService.LoadOSSAccounts()
  58. if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize {
  59. downloadPoolSize = poolSize
  60. ossService.DownloadPool = make(chan bool, downloadPoolSize)
  61. }
  62. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
  63. getDetailFromEsPoolSize = poolSize
  64. ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
  65. }
  66. if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
  67. getDetailFromMgoPoolSize = poolSize
  68. ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
  69. }
  70. }
  71. })
  72. // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
  73. go func() {
  74. ticker := time.NewTicker(5 * time.Second)
  75. var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
  76. for range ticker.C {
  77. util.SendHeartbeat(ctx)
  78. util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
  79. util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn)
  80. util.CheckUploadQueue(ctx, &uploadQueuePrevWarn)
  81. util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
  82. }
  83. }()
  84. // state.Listeners 与配置的 Addresses 顺序一致
  85. servers := make([]osr.Server, len(state.Listeners))
  86. // 启动RPC服务:注册OSSService,实现接口调用
  87. rpcService := new(ossService.OSSService)
  88. rpc.Register(rpcService)
  89. rpc.HandleHTTP()
  90. mux := http.DefaultServeMux
  91. mux.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
  92. mux.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
  93. mux.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
  94. mux.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
  95. mux.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
  96. mux.HandleFunc("/ossservice/version", func(w http.ResponseWriter, r *http.Request) {
  97. fmt.Fprint(w, "版本号:", Version)
  98. })
  99. servers[0] = &http.Server{Handler: mux}
  100. //创建一个grpc 服务器
  101. grpcServer := grpc.NewServer()
  102. defer grpcServer.GracefulStop()
  103. //注册事件
  104. pb.RegisterServiceServer(grpcServer, &ossService.Grpc{})
  105. servers[1] = grpcServer
  106. // 启动所有服务(每个监听对应一个服务器)
  107. for i, listener := range state.Listeners {
  108. go func(srv osr.Server, l net.Listener) {
  109. log.Println("子进程", pid, "启动端口", l.Addr().String())
  110. if err := srv.Serve(l); err != nil {
  111. log.Println("端口", l.Addr().String(), "服务启动失败", err)
  112. }
  113. }(servers[i], listener)
  114. }
  115. // 监听关闭信号,优雅关闭所有服务器
  116. <-state.GracefulShutdown
  117. log.Println("子进程", pid, "停止运行。。。")
  118. }
  119. // /////////////////
  120. func main11() {
  121. start := flag.Int64("s", 0, "")
  122. end := flag.Int64("e", 0, "")
  123. poolSize := flag.Int("p", 5, "")
  124. flag.Parse()
  125. // 初始化OSS帐号与bucket信息
  126. ossService.LoadOSSAccounts()
  127. sess := config.Mgo.GetMgoConn()
  128. defer config.Mgo.DestoryMongoConn(sess)
  129. query := map[string]interface{}{}
  130. idQuery := map[string]interface{}{}
  131. if *start != 0 && *start == *end {
  132. query["comeintime"] = *start
  133. } else {
  134. if *start != 0 {
  135. idQuery["$gte"] = *start
  136. }
  137. if *end != 0 {
  138. idQuery["$lte"] = *end
  139. }
  140. }
  141. if len(idQuery) > 0 {
  142. query["comeintime"] = idQuery
  143. }
  144. log.Println("start。。。", query)
  145. it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{
  146. "_id": 1,
  147. "detail": 1,
  148. "contenthtml": 1,
  149. "comeintime": 1,
  150. }).Sort("-comeintime").Iter()
  151. pool := make(chan bool, *poolSize)
  152. wait := &sync.WaitGroup{}
  153. index := 0
  154. for m := make(map[string]interface{}); it.Next(&m); {
  155. pool <- true
  156. wait.Add(1)
  157. index++
  158. if index%5000 == 0 {
  159. log.Println(index, m["_id"], m["comeintime"])
  160. }
  161. go func(mm map[string]interface{}) {
  162. defer func() {
  163. <-pool
  164. wait.Done()
  165. }()
  166. objectName := BsonIdToSId(mm["_id"]) + ".txt"
  167. detail, _ := mm["detail"].(string)
  168. for {
  169. if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil {
  170. log.Println("detail", objectName, err1)
  171. time.Sleep(3 * time.Second)
  172. } else {
  173. break
  174. }
  175. }
  176. contenthtml, _ := mm["contenthtml"].(string)
  177. for {
  178. if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil {
  179. log.Println("contenthtml", objectName, err2)
  180. time.Sleep(3 * time.Second)
  181. } else {
  182. break
  183. }
  184. }
  185. }(m)
  186. m = make(map[string]interface{})
  187. }
  188. wait.Wait()
  189. log.Println("over。。。", index)
  190. }