package main import ( "data_fusion/config" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "reflect" "strings" "sync" "time" ) var ( ArrLock []sync.Mutex ) func init() { config.Init("./common.toml") InitLog() InitMgo() redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db) initData() updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) recordPool = make(chan []map[string]interface{}, 5000) recordSp = make(chan bool, 5) for i := 0; i < config.Conf.Serve.Thread; i++ { ArrLock = append(ArrLock, sync.Mutex{}) } } func main() { go updateMethod() go updateMethod1() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("635ff15d631ff1ac3d095c41")} //f := map[string]interface{}{"contenthtml": 0} it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); count++ { if count%20000 == 0 { log.Info("main", zap.Int("current:", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if repeat := util.IntAll(tmp["extracttype"]); repeat != -1 { updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": bson.M{"fusion_tag": 1}}, } } else { repeatId := util.ObjToString(tmp["repeat_id"]) ArrLock[util.HashCode(repeatId)%config.Conf.Serve.Thread].Lock() if str := redis.GetStr("fusion_id", repeatId); str != "" { mid := strings.Split(str, "-")[0] tmp1 := findData(mid) w, s := getWeight(tmp) w1, _ := getWeight(tmp1) var update map[string]interface{} var fs []string if w > w1 { update, fs = mergeTmp(tmp, tmp1) update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) update1["weight"] = w update["fusion_tag"] = 1 if fs != nil && len(fs) > 0 { updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}}, } } else { updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, } } updatePool <- []map[string]interface{}{ {"_id": tmp1["_id"]}, {"$set": bson.M{"fusion_tag": 0}}, } record := make(map[string]interface{}) record["$set"] = map[string]interface{}{ "template_id": mongodb.BsonIdToSId(tmp["_id"]), "template_weight": w, } if w == 0 { update1["remark"] = s } record["$push"] = map[string]interface{}{ "ids": mongodb.BsonIdToSId(tmp["_id"]), "record": update1, } recordPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(repeatId)}, record, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str)) } else { update, fs = mergeTmp(tmp1, tmp) update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) update1["weight"] = w update["fusion_tag"] = 1 if fs != nil && len(fs) > 0 { updatePool <- []map[string]interface{}{ {"_id": (tmp1)["_id"]}, {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}}, } } else { updatePool <- []map[string]interface{}{ {"_id": (tmp1)["_id"]}, {"$set": update}, } } updatePool <- []map[string]interface{}{ {"_id": (tmp)["_id"]}, {"$set": bson.M{"fusion_tag": 0}}, } record := make(map[string]interface{}) record["$set"] = map[string]interface{}{ "template_weight": w1, } if w == 0 { update1["remark"] = s } record["$push"] = map[string]interface{}{ "ids": mongodb.BsonIdToSId(tmp["_id"]), "record": update1, } recordPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(repeatId)}, record, } redis.PutCKV("fusion_id", mid, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"]))) } } else { tmp1 := findData(repeatId) w, s := getWeight(tmp) w1, s1 := getWeight(tmp1) var update map[string]interface{} var fs []string if w > w1 { update, fs = mergeTmp(tmp, tmp1) set := util.DeepCopy(update).(map[string]interface{}) if fs != nil && len(fs) > 0 { set["fusion_fields"] = fs } set["fusion_tag"] = 1 updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": set}, } updatePool <- []map[string]interface{}{ {"_id": tmp1["_id"]}, {"$set": bson.M{"fusion_tag": 0}}, } record := make(map[string]interface{}) record["_id"] = tmp1["_id"] record["template_id"] = mongodb.BsonIdToSId(tmp["_id"]) record["template_weight"] = w record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)} var recordlist []map[string]interface{} recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w}) update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp1["_id"]) update1["weight"] = w1 if w1 == 0 { update1["remark"] = s1 } recordlist = append(recordlist, update1) record["record"] = recordlist recordPool <- []map[string]interface{}{ {"_id": tmp1["_id"]}, {"$set": record}, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"]))) } else { update, fs = mergeTmp(tmp1, tmp) set := util.DeepCopy(update).(map[string]interface{}) if fs != nil && len(fs) > 0 { set["fusion_fields"] = fs } set["fusion_tag"] = 1 updatePool <- []map[string]interface{}{ {"_id": tmp1["_id"]}, {"$set": set}, } updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": bson.M{"fusion_tag": 0}}, } record := make(map[string]interface{}) record["_id"] = tmp1["_id"] record["template_id"] = mongodb.BsonIdToSId(tmp1["_id"]) record["template_weight"] = w1 record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])} var recordlist []map[string]interface{} recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp1["_id"]), "weight": w1}) update1 := util.DeepCopy(update).(map[string]interface{}) update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) update1["weight"] = w if w == 0 { update1["remark"] = s } recordlist = append(recordlist, update1) record["record"] = recordlist recordPool <- []map[string]interface{}{ {"_id": tmp1["_id"]}, {"$set": record}, } redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp1["_id"]), mongodb.BsonIdToSId(tmp["_id"]))) } } ArrLock[util.HashCode(repeatId)%config.Conf.Serve.Thread].Unlock() } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("fusion over...", zap.Int("count:", count)) c := make(chan bool, 1) <-c } func findData(id string) map[string]interface{} { tmp, _ := MgoB.FindById(config.Conf.DB.Mongo.Coll, id, nil) if tmp != nil && len(*tmp) > 0 { return *tmp } else { tmp, _ = MgoB.FindById("bidding", id, nil) if tmp != nil && len(*tmp) > 0 { return *tmp } } return nil } func getWeight(tmp map[string]interface{}) (int, string) { var w int if util.IntAll(tmp["publishtime"]) <= 0 { return 0, "发布时间小于0" } if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 { return 0, "竞品网站数据" } for k, v := range config.Conf.Serve.Weight { if tmp[k] != nil { if k == "attachments" { if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok { if atts, o2 := pinfo["attachments"].(map[string]interface{}); o2 { if len(atts) > 0 { w += v } } } } else { if reflect.TypeOf(tmp[k]).String() == "string" { if util.ObjToString(tmp[k]) != "" { w += v } } else if reflect.TypeOf(tmp[k]).String() == "float64" { if util.Float64All(tmp[k]) > 0 { w += v } } else if reflect.TypeOf(tmp[k]).String() == "[]interface {}" { if len(tmp[k].([]interface{})) > 0 { w += v } } else { w += v } } } } return w, "" } // @Description tmp模版数据, tmp1补充数据 // @Author J 2023/1/3 11:31 func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) (map[string]interface{}, []string) { update := make(map[string]interface{}) var fs []string for _, v := range config.Conf.Serve.Fields { if v == "attachments" { if pinfo1, ok1 := tmp1["projectinfo"].(map[string]interface{}); ok1 { if ats, ok2 := pinfo1[v].(map[string]interface{}); ok2 { if pinfo1[v] != nil && len(ats) > 0 { if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok { if pinfo[v] == nil { pinfo[v] = pinfo1[v] update["projectinfo"] = pinfo update["attach_text"] = tmp1["attach_text"] // 补充附件文本 fs = append(fs, v) fs = append(fs, "attach_text") } } else { update["projectinfo"] = map[string]interface{}{v: pinfo1[v]} update["attach_text"] = tmp1["attach_text"] fs = append(fs, v) fs = append(fs, "attach_text") } } } else { if pinfo1[v] != nil { log.Error("mergeTmp err...", zap.Any("id", mongodb.BsonIdToSId(tmp1["_id"]))) } } } } else if v == "city" { if util.ObjToString(tmp["area"]) == util.ObjToString(tmp1["area"]) { if tmp[v] == nil && tmp1[v] != nil { update[v] = util.ObjToString(tmp1[v]) fs = append(fs, v) } } } else if v == "district" { if util.ObjToString(tmp["city"]) == util.ObjToString(tmp1["city"]) { if tmp[v] == nil && tmp1[v] != nil { update[v] = util.ObjToString(tmp1[v]) fs = append(fs, v) } } } else { if tmp[v] == nil && tmp1[v] != nil { if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" { update[v] = util.ObjToString(tmp1[v]) fs = append(fs, v) } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 { update[v] = tmp1[v] fs = append(fs, v) } else { update[v] = tmp1[v] fs = append(fs, v) } } } } return update, fs } func updateMethod() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-recordPool: arru[indexu] = v indexu++ if indexu == 500 { recordSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-recordSp }() MgoB.UpSertBulk(config.Conf.DB.Mongo.Record, arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { recordSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-recordSp }() MgoB.UpSertBulk(config.Conf.DB.Mongo.Record, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } func updateMethod1() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } }