main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-redis/redis"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "mongodb"
  8. "qfw/util"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. MongoTool *mongodb.MongodbSim
  17. rdb *redis.Client
  18. updatePool = make(chan []map[string]interface{}, 5000)
  19. updateSp = make(chan bool, 5)
  20. )
  21. func init() {
  22. MongoTool = &mongodb.MongodbSim{
  23. MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
  24. Size: 10,
  25. DbName: "mixdata",
  26. UserName: "SJZY_RWESBid_Other",
  27. Password: "SJZY@O17t8herB3B",
  28. }
  29. MongoTool.InitPool()
  30. //MongoTool = &mongodb.MongodbSim{
  31. // MongodbAddr: "172.17.4.85:27080",
  32. // Size: 10,
  33. // DbName: "qfw",
  34. //}
  35. //MongoTool.InitPool()
  36. }
  37. func initRedis() (err error) {
  38. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  39. defer cancel()
  40. rdb = redis.NewClient(&redis.Options{
  41. Addr: "127.0.0.1:8379",
  42. PoolSize: 200,
  43. DB: 2,
  44. })
  45. _, err = rdb.Ping(ctx).Result()
  46. if err != nil {
  47. fmt.Println("ping redis failed err:", err)
  48. return err
  49. }
  50. return nil
  51. }
  52. func main() {
  53. go updateMethod()
  54. sess := MongoTool.GetMgoConn()
  55. defer MongoTool.DestoryMongoConn(sess)
  56. ch := make(chan bool, 3)
  57. wg := &sync.WaitGroup{}
  58. f := bson.M{"company_name": 1}
  59. query := sess.DB("mixdata").C("winner_enterprise").Find(nil).Select(f).Iter()
  60. count := 0
  61. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  62. if count%5000 == 0 {
  63. util.Debug("current ---", count)
  64. }
  65. ch <- true
  66. wg.Add(1)
  67. go func(tmp map[string]interface{}) {
  68. defer func() {
  69. <-ch
  70. wg.Done()
  71. }()
  72. }(tmp)
  73. tmp = make(map[string]interface{})
  74. }
  75. }
  76. func main1() {
  77. go updateMethod()
  78. err := initRedis()
  79. if err != nil {
  80. fmt.Println("init redis failed err :", err)
  81. return
  82. }
  83. ctx := context.Background()
  84. var cursor uint64
  85. var keys []string
  86. n := 0
  87. for {
  88. keys, cursor, err = rdb.Scan(ctx, cursor, "*", 500).Result()
  89. if err != nil {
  90. fmt.Println("scan keys failed err:", err)
  91. return
  92. }
  93. n += len(keys)
  94. for _, key := range keys {
  95. val, err := rdb.Get(ctx, key).Result()
  96. if err != nil {
  97. fmt.Println("get key values failed err:", err)
  98. return
  99. }
  100. arr := strings.Split(val, "-")
  101. if len(arr) > 1 {
  102. num, _ := strconv.Atoi(strings.ReplaceAll(arr[1], "\"", ""))
  103. save := []map[string]interface{}{{
  104. "_id": key,
  105. },
  106. {"$set": map[string]interface{}{"employee_num": num}},
  107. }
  108. updatePool <- save
  109. }
  110. //val, _ = strconv.Unquote(val) // 处理json字符串带转义符号
  111. //maps := make(map[string]interface{})
  112. //err1 := json.Unmarshal([]byte(val), &maps)
  113. //if err1 != nil {
  114. // util.Debug("-----map解析异常")
  115. //}
  116. //taskinfo(ctx, key, maps)
  117. }
  118. util.Debug("current---", n, cursor)
  119. if cursor == 0 {
  120. util.Debug("over---", n)
  121. break
  122. }
  123. }
  124. }
  125. func taskinfo(ctx context.Context, name string, tmp map[string]interface{}) {
  126. q := bson.M{"company_name": name}
  127. info, b := MongoTool.FindOneByField("qyxy_std", q, nil)
  128. if b && len(*info) > 0 {
  129. if types, ok := (*info)["bid_unittype"].([]interface{}); ok {
  130. t1 := util.ObjArrToStringArr(types)
  131. t2 := util.ObjArrToStringArr(tmp["bid_unittype"].([]interface{}))
  132. t2 = append(t2, t1...)
  133. tmp["bid_unittype"] = Duplicate(t2)
  134. }
  135. tmp["updatetime"] = time.Now().Unix()
  136. MongoTool.Update("qyxy_std", bson.M{"_id": (*info)["_id"]}, bson.M{"$set": tmp}, true, false)
  137. } else {
  138. tmp["company_name"] = name
  139. MongoTool.Save("qyxy_std_err", tmp)
  140. rdb.Del(ctx, name)
  141. }
  142. }
  143. func Duplicate(a interface{}) (ret []interface{}) {
  144. va := reflect.ValueOf(a)
  145. for i := 0; i < va.Len(); i++ {
  146. if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) {
  147. continue
  148. }
  149. ret = append(ret, va.Index(i).Interface())
  150. }
  151. return ret
  152. }
  153. func updateMethod() {
  154. arru := make([][]map[string]interface{}, 500)
  155. indexu := 0
  156. for {
  157. select {
  158. case v := <-updatePool:
  159. arru[indexu] = v
  160. indexu++
  161. if indexu == 500 {
  162. updateSp <- true
  163. go func(arru [][]map[string]interface{}) {
  164. defer func() {
  165. <-updateSp
  166. }()
  167. MongoTool.UpdateBulk("qyxy_std", arru...)
  168. }(arru)
  169. arru = make([][]map[string]interface{}, 500)
  170. indexu = 0
  171. }
  172. case <-time.After(1000 * time.Millisecond):
  173. if indexu > 0 {
  174. updateSp <- true
  175. go func(arru [][]map[string]interface{}) {
  176. defer func() {
  177. <-updateSp
  178. }()
  179. MongoTool.UpdateBulk("qyxy_std", arru...)
  180. }(arru[:indexu])
  181. arru = make([][]map[string]interface{}, 500)
  182. indexu = 0
  183. }
  184. }
  185. }
  186. }