main.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package main
  2. import (
  3. "data_credible/config"
  4. "fmt"
  5. "github.com/spf13/cobra"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "sync"
  11. "time"
  12. )
  13. func main() {
  14. loadSite()
  15. go saveQueue()
  16. rootCmd := &cobra.Command{Use: "my cmd"}
  17. rootCmd.AddCommand(taskOld())
  18. rootCmd.AddCommand(taskNew())
  19. if err := rootCmd.Execute(); err != nil {
  20. fmt.Println("rootCmd.Execute failed", err.Error())
  21. }
  22. c := make(chan bool, 1)
  23. <-c
  24. }
  25. func taskOld() *cobra.Command {
  26. cmdClient := &cobra.Command{
  27. Use: "old",
  28. Short: "Start dispose task",
  29. Run: func(cmd *cobra.Command, args []string) {
  30. sess := MongoS.GetMgoConn()
  31. defer MongoS.DestoryMongoConn(sess)
  32. ch := make(chan bool, config.Conf.Serve.Thread)
  33. wg := &sync.WaitGroup{}
  34. query := sess.DB(Dbname).C(Coll).Find(nil).Select(nil).Iter()
  35. count := 0
  36. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  37. if count%5000 == 0 {
  38. log.Info(fmt.Sprintf("current --- %d", count))
  39. }
  40. ch <- true
  41. wg.Add(1)
  42. go func(temp map[string]interface{}) {
  43. defer func() {
  44. <-ch
  45. wg.Done()
  46. }()
  47. task1(temp)
  48. }(tmp)
  49. tmp = make(map[string]interface{})
  50. }
  51. wg.Wait()
  52. log.Info(fmt.Sprintf("over --- %d", count))
  53. },
  54. }
  55. cmdClient.Flags().StringVarP(&Dbname, "dbname", "d", "", "库名")
  56. cmdClient.Flags().StringVarP(&Coll, "coll", "c", "", "表名")
  57. return cmdClient
  58. }
  59. func taskNew() *cobra.Command {
  60. cmdClient := &cobra.Command{
  61. Use: "new",
  62. Short: "Start dispose task",
  63. Run: func(cmd *cobra.Command, args []string) {
  64. sess := MongoBz.GetMgoConn()
  65. defer MongoBz.DestoryMongoConn(sess)
  66. ch := make(chan bool, config.Conf.Serve.Thread)
  67. wg := &sync.WaitGroup{}
  68. query := sess.DB(config.Conf.DB.MongoBz.Dbname).C(config.Conf.DB.MongoBz.Coll).Find(nil).Select(nil).Iter()
  69. count := 0
  70. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  71. if count%5000 == 0 {
  72. log.Info(fmt.Sprintf("current --- %d", count))
  73. }
  74. ch <- true
  75. wg.Add(1)
  76. go func(temp map[string]interface{}) {
  77. defer func() {
  78. <-ch
  79. wg.Done()
  80. }()
  81. if temp["v_taginfo"] != nil {
  82. taskInfo(temp)
  83. }
  84. }(tmp)
  85. tmp = make(map[string]interface{})
  86. }
  87. wg.Wait()
  88. log.Info(fmt.Sprintf("over --- %d", count))
  89. },
  90. }
  91. return cmdClient
  92. }
  93. func loadSite() {
  94. f := bson.M{"site": 1, "site_type": 1, "second_type": 1, "platform": 1}
  95. info, _ := MongoS.Find("site", nil, nil, f, false, -1, -1)
  96. if len(*info) > 0 {
  97. for _, m := range *info {
  98. SiteMap[util.ObjToString(m["site"])] = fmt.Sprintf("%s-%s-%s", util.ObjToString(m["site_type"]),
  99. util.ObjToString(m["second_type"]), util.ObjToString(m["platform"]))
  100. }
  101. }
  102. log.Info("loadSite", zap.Int("SiteMap", len(SiteMap)))
  103. info1, _ := MongoS.Find("luaconfig", nil, nil, bson.M{"code": 1, "platform": 1}, false, -1, -1)
  104. if len(*info1) > 0 {
  105. for _, m := range *info1 {
  106. CodeMap[util.ObjToString(m["code"])] = util.ObjToString(m["platform"])
  107. }
  108. }
  109. log.Info("loadSite", zap.Int("CodeMap", len(CodeMap)))
  110. }
  111. func saveQueue() {
  112. arru := make([]map[string]interface{}, saveSize)
  113. indexu := 0
  114. for {
  115. select {
  116. case v := <-savePool:
  117. arru[indexu] = v
  118. indexu++
  119. if indexu == saveSize {
  120. saveSp <- true
  121. go func(arru []map[string]interface{}) {
  122. defer func() {
  123. <-saveSp
  124. }()
  125. Mongo.SaveBulk(config.Conf.DB.Mongo.Coll, arru...)
  126. }(arru)
  127. arru = make([]map[string]interface{}, saveSize)
  128. indexu = 0
  129. }
  130. case <-time.After(1 * time.Second):
  131. if indexu > 0 {
  132. saveSp <- true
  133. go func(arru []map[string]interface{}) {
  134. defer func() {
  135. <-saveSp
  136. }()
  137. Mongo.SaveBulk(config.Conf.DB.Mongo.Coll, arru...)
  138. }(arru[:indexu])
  139. arru = make([]map[string]interface{}, saveSize)
  140. indexu = 0
  141. }
  142. }
  143. }
  144. }