main.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/go-redis/redis"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "mongodb"
  9. "qfw/util"
  10. "reflect"
  11. "strconv"
  12. "time"
  13. )
  14. var (
  15. MongoTool *mongodb.MongodbSim
  16. rdb *redis.Client
  17. )
  18. func init() {
  19. MongoTool = &mongodb.MongodbSim{
  20. MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
  21. Size: 10,
  22. DbName: "mixdata",
  23. UserName: "SJZY_RWESBid_Other",
  24. Password: "SJZY@O17t8herB3B",
  25. }
  26. MongoTool.InitPool()
  27. }
  28. func initRedis() (err error) {
  29. ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10)
  30. defer cancel()
  31. rdb = redis.NewClient(&redis.Options{
  32. Addr: "127.0.0.1:8379",
  33. PoolSize: 200,
  34. DB: 3,
  35. })
  36. _, err = rdb.Ping(ctx).Result()
  37. if err !=nil{
  38. fmt.Println("ping redis failed err:",err)
  39. return err
  40. }
  41. return nil
  42. }
  43. func main() {
  44. err := initRedis()
  45. if err !=nil{
  46. fmt.Println("init redis failed err :",err)
  47. return
  48. }
  49. ctx := context.Background()
  50. var cursor uint64
  51. var keys []string
  52. n := 0
  53. for {
  54. keys, cursor, err = rdb.Scan(ctx, cursor,"*",500).Result()
  55. if err !=nil{
  56. fmt.Println("scan keys failed err:",err)
  57. return
  58. }
  59. util.Debug("---keys---", len(keys))
  60. n += len(keys)
  61. for _,key := range keys{
  62. val, err := rdb.Get(ctx, key).Result()
  63. if err != nil{
  64. fmt.Println("get key values failed err:", err)
  65. return
  66. }
  67. val, _ = strconv.Unquote(val) // 处理json字符串带转义符号
  68. maps := make(map[string]interface{})
  69. err1 := json.Unmarshal([]byte(val), &maps)
  70. if err1 != nil {
  71. util.Debug("-----map解析异常")
  72. }
  73. taskinfo(ctx, key, maps)
  74. }
  75. util.Debug("current---", n, cursor)
  76. if cursor == 0 {
  77. util.Debug("over---", n)
  78. break
  79. }
  80. }
  81. }
  82. func taskinfo(ctx context.Context, name string, tmp map[string]interface{}) {
  83. q := bson.M{"company_name": name}
  84. info, b := MongoTool.FindOneByField("qyxy_std", q, nil)
  85. if b && len(*info) > 0 {
  86. if types, ok := (*info)["bid_unittype"].([]interface{}); ok {
  87. t1 := util.ObjArrToStringArr(types)
  88. t2 := util.ObjArrToStringArr(tmp["bid_unittype"].([]interface{}))
  89. t2 = append(t2, t1...)
  90. tmp["bid_unittype"] = Duplicate(t2)
  91. }
  92. MongoTool.Update("qyxy_std", bson.M{"_id": (*info)["_id"]}, bson.M{"$set": tmp}, true, false)
  93. }else {
  94. tmp["company_name"] = name
  95. MongoTool.Save("qyxy_std_err", tmp)
  96. rdb.Del(ctx, name)
  97. }
  98. }
  99. func Duplicate(a interface{}) (ret []interface{}) {
  100. va := reflect.ValueOf(a)
  101. for i := 0; i < va.Len(); i++ {
  102. if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) {
  103. continue
  104. }
  105. ret = append(ret, va.Index(i).Interface())
  106. }
  107. return ret
  108. }