package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/redis" "data_fusion/config" "fmt" "go.uber.org/zap" "reflect" "strings" "sync" "time" ) func init() { config.Init("./common.toml") InitLog() InitMgo() redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db) initData() updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) recordPool = make(chan []map[string]interface{}, 5000) recordSp = make(chan bool, 5) } func main() { go updateMethod() go updateMethod1() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := map[string]interface{}{"_id": mongodb.StringTOBsonId("639751bb063a7b816e026aa1")} it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding_fusion").Find(q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); count++ { if count%2000 == 0 { log.Info("main", zap.Int("current:", count)) } if repeat := util.IntAll(tmp["repeat"]); repeat != 1 { continue } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() repeatId := util.ObjToString(tmp["repeat_id"]) if str := redis.GetStr("fusion_id", repeatId); str != "" { mid := strings.Split(str, "-")[0] tmp1, _ := MgoB.FindById("bidding_fusion", mid, nil) w, s := getWeight(tmp) w1, s1 := getWeight(*tmp1) util.Debug(w, s, w1, s1) var update map[string]interface{} if w > w1 { update = mergeTmp(tmp, *tmp1) //if len(update) > 0 { // updatePool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": update}, // } //} record := make(map[string]interface{}) record["$set"] = map[string]interface{}{ "template_id": mongodb.BsonIdToSId(tmp["_id"]), "template_weight": w, } update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) update1["weight"] = w if w == 0 { update1["remark"] = s } record["$push"] = map[string]interface{}{ "ids": mongodb.BsonIdToSId(tmp["_id"]), "record": update1, } recordPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(repeatId)}, record, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str)) } else { update = mergeTmp(*tmp1, tmp) //if len(update) > 0 { // updatePool <- []map[string]interface{}{ // {"_id": (*tmp1)["_id"]}, // {"$set": update}, // } //} record := make(map[string]interface{}) record["$set"] = map[string]interface{}{ "template_weight": w1, } update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) update1["weight"] = w if w == 0 { update1["remark"] = s } record["$push"] = map[string]interface{}{ "ids": mongodb.BsonIdToSId(tmp["_id"]), "record": update1, } recordPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(repeatId)}, record, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"]))) } } else { tmp1, _ := MgoB.FindById("bidding_fusion", repeatId, nil) w, s := getWeight(tmp) w1, s1 := getWeight(*tmp1) var update map[string]interface{} if w > w1 { update = mergeTmp(tmp, *tmp1) //if len(update) > 0 { // updatePool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": update}, // } //} record := make(map[string]interface{}) record["_id"] = (*tmp1)["_id"] record["template_id"] = mongodb.BsonIdToSId(tmp["_id"]) record["template_weight"] = w record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)} var recordlist []map[string]interface{} recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w}) update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId((*tmp1)["_id"]) update1["weight"] = w1 if w1 == 0 { update1["remark"] = s1 } recordlist = append(recordlist, update1) record["record"] = recordlist recordPool <- []map[string]interface{}{ {"_id": (*tmp1)["_id"]}, {"$set": record}, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"]))) } else { update = mergeTmp(*tmp1, tmp) //if len(update) > 0 { // updatePool <- []map[string]interface{}{ // {"_id": (*tmp1)["_id"]}, // {"$set": update}, // } //} record := make(map[string]interface{}) record["_id"] = (*tmp1)["_id"] record["template_id"] = mongodb.BsonIdToSId((*tmp1)["_id"]) record["template_weight"] = w1 record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])} var recordlist []map[string]interface{} recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId((*tmp1)["_id"]), "weight": w1}) update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) update1["weight"] = w if w == 0 { update1["remark"] = s } recordlist = append(recordlist, update1) record["record"] = recordlist recordPool <- []map[string]interface{}{ {"_id": (*tmp1)["_id"]}, {"$set": record}, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId((*tmp1)["_id"]), mongodb.BsonIdToSId(tmp["_id"]))) } } }(tmp) tmp = map[string]interface{}{} } c := make(chan bool, 1) <-c } func getWeight(tmp map[string]interface{}) (int, string) { var w int if util.IntAll(tmp["publishtime"]) <= 0 { return 0, "发布时间小于0" } if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 { return 0, "竞品网站数据" } for k, v := range config.Conf.Serve.Weight { if tmp[k] != nil { util.Debug(k) if reflect.TypeOf(tmp[k]).String() == "string" { if util.ObjToString(tmp[k]) != "" { w += v } } else if reflect.TypeOf(tmp[k]).String() == "float64" { if util.Float64All(tmp[k]) > 0 { w += v } } else { w += v } } } return w, "" } // @Description tmp模版数据, tmp1补充数据 // @Author J 2023/1/3 11:31 func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) map[string]interface{} { update := make(map[string]interface{}) for _, v := range config.Conf.Serve.Fields { if tmp[v] == nil && tmp1[v] != nil { if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" { update[v] = util.ObjToString(tmp1[v]) } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 { update[v] = tmp1[v] } else { update[v] = tmp1[v] } } } return update } func updateMethod() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-recordPool: arru[indexu] = v indexu++ if indexu == 500 { recordSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-recordSp }() MgoB.UpSertBulk("bidding_fusion_record", arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { recordSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-recordSp }() MgoB.UpSertBulk("bidding_fusion_record", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } func updateMethod1() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding_fusion", arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding_fusion", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } }