package main import ( "context" "fmt" "github.com/go-redis/redis" "go.mongodb.org/mongo-driver/bson" "mongodb" "qfw/util" "reflect" "strconv" "strings" "sync" "time" ) var ( MongoTool *mongodb.MongodbSim rdb *redis.Client updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) ) func init() { MongoTool = &mongodb.MongodbSim{ MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082", Size: 10, DbName: "mixdata", UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", } MongoTool.InitPool() //MongoTool = &mongodb.MongodbSim{ // MongodbAddr: "172.17.4.85:27080", // Size: 10, // DbName: "qfw", //} //MongoTool.InitPool() } func initRedis() (err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:8379", PoolSize: 200, DB: 2, }) _, err = rdb.Ping(ctx).Result() if err != nil { fmt.Println("ping redis failed err:", err) return err } return nil } func main() { go updateMethod() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} f := bson.M{"company_name": 1} query := sess.DB("mixdata").C("winner_enterprise").Find(nil).Select(f).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%5000 == 0 { util.Debug("current ---", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() }(tmp) tmp = make(map[string]interface{}) } } func main1() { go updateMethod() err := initRedis() if err != nil { fmt.Println("init redis failed err :", err) return } ctx := context.Background() var cursor uint64 var keys []string n := 0 for { keys, cursor, err = rdb.Scan(ctx, cursor, "*", 500).Result() if err != nil { fmt.Println("scan keys failed err:", err) return } n += len(keys) for _, key := range keys { val, err := rdb.Get(ctx, key).Result() if err != nil { fmt.Println("get key values failed err:", err) return } arr := strings.Split(val, "-") if len(arr) > 1 { num, _ := strconv.Atoi(strings.ReplaceAll(arr[1], "\"", "")) save := []map[string]interface{}{{ "_id": key, }, {"$set": map[string]interface{}{"employee_num": num}}, } updatePool <- save } //val, _ = strconv.Unquote(val) // 处理json字符串带转义符号 //maps := make(map[string]interface{}) //err1 := json.Unmarshal([]byte(val), &maps) //if err1 != nil { // util.Debug("-----map解析异常") //} //taskinfo(ctx, key, maps) } util.Debug("current---", n, cursor) if cursor == 0 { util.Debug("over---", n) break } } } func taskinfo(ctx context.Context, name string, tmp map[string]interface{}) { q := bson.M{"company_name": name} info, b := MongoTool.FindOneByField("qyxy_std", q, nil) if b && len(*info) > 0 { if types, ok := (*info)["bid_unittype"].([]interface{}); ok { t1 := util.ObjArrToStringArr(types) t2 := util.ObjArrToStringArr(tmp["bid_unittype"].([]interface{})) t2 = append(t2, t1...) tmp["bid_unittype"] = Duplicate(t2) } tmp["updatetime"] = time.Now().Unix() MongoTool.Update("qyxy_std", bson.M{"_id": (*info)["_id"]}, bson.M{"$set": tmp}, true, false) } else { tmp["company_name"] = name MongoTool.Save("qyxy_std_err", tmp) rdb.Del(ctx, name) } } func Duplicate(a interface{}) (ret []interface{}) { va := reflect.ValueOf(a) for i := 0; i < va.Len(); i++ { if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) { continue } ret = append(ret, va.Index(i).Interface()) } return ret } func updateMethod() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpdateBulk("qyxy_std", arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpdateBulk("qyxy_std", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } }