package main import ( "data_clear_sync/config" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "strings" "sync" "time" ) func taskinfoV1() { defer util.Catch() sess := MongoV1.GetMgoConn() defer MongoV1.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} //q := bson.M{"_id": mongodb.StringTOBsonId("5caeb0bca5cb26b9b733eb0c")} f := bson.M{"contenthtml": 0, "description": 0, "detail": 0, "kvtext": 0, "projectinfo": 0} it := sess.DB(config.Conf.DB.Mongo1.Dbname).C(config.Conf.DB.Mongo1.Coll).Find(nil).Select(&f).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); count++ { if count%2000 == 0 { log.Info("taskinfoV1", zap.Int("current:", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) info, _ := Mongo.FindById("bidding", id, f) coll := "bidding" if len(*info) == 0 { info, _ = Mongo.FindById("bidding_back", id, f) if len(*info) == 0 { util.Debug("not find id", id) return } coll = "bidding_back" } update := make(map[string]interface{}) updateEs := make(map[string]interface{}) mdinfo := make(map[string]interface{}) record := make(map[string]interface{}) for _, f := range Fields { if f == "budget" || f == "bidamount" { if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.Float64All(tmp[f]) != util.Float64All((*info)[f]) { update[f] = util.Float64All(tmp[f]) record[f] = util.Float64All(tmp[f]) updateEs[f] = util.Float64All(tmp[f]) mdinfo[f] = "数据清洗" } } else if f == "s_winner" { if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.Float64All(tmp[f]) != util.Float64All((*info)[f]) { update[f] = util.ObjToString(tmp[f]) record[f] = util.ObjToString(tmp[f]) updateEs[f] = util.ObjToString(tmp[f]) cid := companyFun(update) update["entidlist"] = cid record["entidlist"] = cid updateEs["entidlist"] = cid } } else { if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.ObjToString(tmp[f]) != util.ObjToString((*info)[f]) { update[f] = util.ObjToString(tmp[f]) record[f] = util.ObjToString(tmp[f]) updateEs[f] = util.ObjToString(tmp[f]) mdinfo[f] = "数据清洗" } } } for _, f := range config.Conf.Serve.ExField { if f == "package" { } } if md, ok := (*info)["modifyinfo"].(map[string]interface{}); ok { for k, v := range mdinfo { md[k] = v } mdinfo = md } now := time.Now().Unix() record["updatetime"] = now record["modifypath"] = "数据清洗" if len(update) > 0 { updateEsPool <- []map[string]interface{}{ {"_id": id}, updateEs, } update["modifyinfo"] = mdinfo if coll == "bidding" { updatePool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": update}, } } else { Mongo.UpdateById("bidding_back", id, map[string]interface{}{"$set": update}) } updateRcPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": bson.M{"updatetime": now}, "$push": bson.M{"modify": record}}, } } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("taskinfoV1 over...", zap.Int("count:", count)) } func taskinfoV2() { defer util.Catch() sess := MongoV2.GetMgoConn() defer MongoV2.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} //q := bson.M{"_id": mongodb.StringTOBsonId("6229797d32770a446e976450")} f := bson.M{"v_baseinfo.contenthtml": 0, "v_baseinfo.description": 0, "v_baseinfo.detail": 0, "v_baseinfo.kvtext": 0, "v_baseinfo.projectinfo": 0} it := sess.DB(config.Conf.DB.Mongo2.Dbname).C(config.Conf.DB.Mongo2.Coll).Find(nil).Select(&f).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); count++ { if count%2000 == 0 { log.Info("taskinfoV2", zap.Int("current:", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) binfo := util.ObjToMap(tmp["v_baseinfo"]) tinfo := util.ObjToMap(tmp["v_taginfo"]) info, _ := Mongo.FindById("bidding", id, f) coll := "bidding" if len(*info) == 0 { info, _ = Mongo.FindById("bidding_back", id, f) if len(*info) == 0 { util.Debug("not find id", id) return } coll = "bidding_back" } update := make(map[string]interface{}) updateEs := make(map[string]interface{}) mdinfo := make(map[string]interface{}) record := make(map[string]interface{}) for _, f := range Fields { if f == "budget" || f == "bidamount" { if (*tinfo)[f] != nil && util.Float64All((*binfo)[f]) != util.Float64All((*info)[f]) { update[f] = util.Float64All((*binfo)[f]) record[f] = util.Float64All((*binfo)[f]) updateEs[f] = util.Float64All((*binfo)[f]) mdinfo[f] = "数据清洗" } } else if f == "s_winner" { if (*tinfo)[f] != nil && util.Float64All((*binfo)[f]) != util.Float64All((*info)[f]) { update[f] = util.ObjToString((*binfo)[f]) record[f] = util.ObjToString((*binfo)[f]) updateEs[f] = util.ObjToString((*binfo)[f]) cid := companyFun(update) update["entidlist"] = cid record["entidlist"] = cid updateEs["entidlist"] = cid } } else { if (*tinfo)[f] != nil && util.ObjToString((*binfo)[f]) != util.ObjToString((*info)[f]) { update[f] = util.ObjToString((*binfo)[f]) record[f] = util.ObjToString((*binfo)[f]) updateEs[f] = util.ObjToString((*binfo)[f]) mdinfo[f] = "数据清洗" } } } for _, f := range config.Conf.Serve.ExField { if f == "package" { } } if md, ok := (*info)["modifyinfo"].(map[string]interface{}); ok { for k, v := range mdinfo { md[k] = v } mdinfo = md } now := time.Now().Unix() record["updatetime"] = now record["modifypath"] = "数据清洗" if len(update) > 0 { updateEsPool <- []map[string]interface{}{ {"_id": id}, updateEs, } update["modifyinfo"] = mdinfo if coll == "bidding" { updatePool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": update}, } } else { Mongo.UpdateById("bidding_back", id, map[string]interface{}{"$set": update}) } updateRcPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": bson.M{"updatetime": now}, "$push": bson.M{"modify": record}}, } } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("taskinfoV2 over...", zap.Int("count:", count)) } // @Description entidlist // @Author J 2022/6/7 2:36 PM func companyFun(tmp map[string]interface{}) (cid []string) { sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",") for _, w := range sWinnerarr { if w != "" { id := redis.GetStr("qyxy_id", w) if id == "" { id = "-" } cid = append(cid, id) } } return cid }