main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-redis/redis"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "mongodb"
  8. "qfw/util"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. var (
  15. MongoTool *mongodb.MongodbSim
  16. rdb *redis.Client
  17. updatePool = make(chan []map[string]interface{}, 5000)
  18. updateSp = make(chan bool, 5)
  19. )
  20. func init() {
  21. MongoTool = &mongodb.MongodbSim{
  22. MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
  23. Size: 10,
  24. DbName: "mixdata",
  25. UserName: "SJZY_RWESBid_Other",
  26. Password: "SJZY@O17t8herB3B",
  27. }
  28. MongoTool.InitPool()
  29. //MongoTool = &mongodb.MongodbSim{
  30. // MongodbAddr: "172.17.4.85:27080",
  31. // Size: 10,
  32. // DbName: "qfw",
  33. //}
  34. //MongoTool.InitPool()
  35. }
  36. func initRedis() (err error) {
  37. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  38. defer cancel()
  39. rdb = redis.NewClient(&redis.Options{
  40. Addr: "127.0.0.1:8379",
  41. PoolSize: 200,
  42. DB: 2,
  43. })
  44. _, err = rdb.Ping(ctx).Result()
  45. if err != nil {
  46. fmt.Println("ping redis failed err:", err)
  47. return err
  48. }
  49. return nil
  50. }
  51. func main() {
  52. go updateMethod()
  53. err := initRedis()
  54. if err != nil {
  55. fmt.Println("init redis failed err :", err)
  56. return
  57. }
  58. ctx := context.Background()
  59. var cursor uint64
  60. var keys []string
  61. n := 0
  62. for {
  63. keys, cursor, err = rdb.Scan(ctx, cursor, "*", 500).Result()
  64. if err != nil {
  65. fmt.Println("scan keys failed err:", err)
  66. return
  67. }
  68. n += len(keys)
  69. for _, key := range keys {
  70. val, err := rdb.Get(ctx, key).Result()
  71. if err != nil {
  72. fmt.Println("get key values failed err:", err)
  73. return
  74. }
  75. arr := strings.Split(val, "-")
  76. if len(arr) > 1 {
  77. num, _ := strconv.Atoi(strings.ReplaceAll(arr[1], "\"", ""))
  78. save := []map[string]interface{}{{
  79. "_id": key,
  80. },
  81. {"$set": map[string]interface{}{"employee_num": num}},
  82. }
  83. updatePool <- save
  84. }
  85. //val, _ = strconv.Unquote(val) // 处理json字符串带转义符号
  86. //maps := make(map[string]interface{})
  87. //err1 := json.Unmarshal([]byte(val), &maps)
  88. //if err1 != nil {
  89. // util.Debug("-----map解析异常")
  90. //}
  91. //taskinfo(ctx, key, maps)
  92. }
  93. util.Debug("current---", n, cursor)
  94. if cursor == 0 {
  95. util.Debug("over---", n)
  96. break
  97. }
  98. }
  99. }
  100. func taskinfo(ctx context.Context, name string, tmp map[string]interface{}) {
  101. q := bson.M{"company_name": name}
  102. info, b := MongoTool.FindOneByField("qyxy_std", q, nil)
  103. if b && len(*info) > 0 {
  104. if types, ok := (*info)["bid_unittype"].([]interface{}); ok {
  105. t1 := util.ObjArrToStringArr(types)
  106. t2 := util.ObjArrToStringArr(tmp["bid_unittype"].([]interface{}))
  107. t2 = append(t2, t1...)
  108. tmp["bid_unittype"] = Duplicate(t2)
  109. }
  110. tmp["updatetime"] = time.Now().Unix()
  111. MongoTool.Update("qyxy_std", bson.M{"_id": (*info)["_id"]}, bson.M{"$set": tmp}, true, false)
  112. } else {
  113. tmp["company_name"] = name
  114. MongoTool.Save("qyxy_std_err", tmp)
  115. rdb.Del(ctx, name)
  116. }
  117. }
  118. func Duplicate(a interface{}) (ret []interface{}) {
  119. va := reflect.ValueOf(a)
  120. for i := 0; i < va.Len(); i++ {
  121. if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) {
  122. continue
  123. }
  124. ret = append(ret, va.Index(i).Interface())
  125. }
  126. return ret
  127. }
  128. func updateMethod() {
  129. arru := make([][]map[string]interface{}, 500)
  130. indexu := 0
  131. for {
  132. select {
  133. case v := <-updatePool:
  134. arru[indexu] = v
  135. indexu++
  136. if indexu == 500 {
  137. updateSp <- true
  138. go func(arru [][]map[string]interface{}) {
  139. defer func() {
  140. <-updateSp
  141. }()
  142. MongoTool.UpdateBulk("qyxy_std", arru...)
  143. }(arru)
  144. arru = make([][]map[string]interface{}, 500)
  145. indexu = 0
  146. }
  147. case <-time.After(1000 * time.Millisecond):
  148. if indexu > 0 {
  149. updateSp <- true
  150. go func(arru [][]map[string]interface{}) {
  151. defer func() {
  152. <-updateSp
  153. }()
  154. MongoTool.UpdateBulk("qyxy_std", arru...)
  155. }(arru[:indexu])
  156. arru = make([][]map[string]interface{}, 500)
  157. indexu = 0
  158. }
  159. }
  160. }
  161. }