// standarbuyer package main import ( "dbutil/mongo" "dbutil/redis" "encoding/json" "log" qu "qfw/util" "time" "unicode/utf8" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/mgo.v2/bson" ) //增量处理 func buyerStandarData(db string, query map[string]interface{}) { defer qu.Catch() sess := MongoFrom.GetMgoConn() defer MongoFrom.Close() it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1, "buyer": 1, "buyertel": 1, "buyerperson": 1, "buyerclass": 1, "topscopeclass": 1}).Sort("_id").Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过 continue } buyer := qu.ObjToString(tmp["buyer"]) if utf8.RuneCountInString(buyer) < 5 { continue } infoid := mongo.BsonTOStringId(tmp["_id"]) buyerclass := qu.ObjToString(tmp["buyerclass"]) topscopeclass, _ := tmp["topscopeclass"].(primitive.A) entid, _ := redis.GetRedisStr("buyer", buyerbd, buyer) ps := []map[string]interface{}{} buyerperson := qu.ObjToString(tmp["buyerperson"]) buyertel := qu.ObjToString(tmp["buyertel"]) if entid == "" { savetoerr := true if buyerperson != "" || buyertel != "" { v := map[string]interface{}{ "contact_person": buyerperson, "phone": buyertel, "buyerclass": buyerclass, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": infoid, } ps = append(ps, v) data := comHisMegerNewData(buyer, "buyer", ps) if data != nil { _id := MongoTo.Save(buyerent, data) redis.PutRedis("buyer", buyerbd, buyer, _id.(primitive.ObjectID).Hex(), -1) savetoerr = false } } if savetoerr { t := MongoTo.FindOne(buyererr, map[string]interface{}{"name": buyer}) if len(t) < 1 { MongoTo.Save(buyererr, map[string]interface{}{ "name": buyer, "buyerlass": buyerclass, "check": comMarkdata(buyer, "buyer"), "updatetime": time.Now().Unix(), }) } } } else { if buyerperson != "" || buyertel != "" { v := map[string]interface{}{ "contact_person": buyerperson, "phone": buyertel, "buyerclass": buyerclass, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": infoid, } data := buyerMegerBuyerclass(entid, v) MongoTo.UpdateById(buyerent, entid, map[string]interface{}{ "$set": data, "$push": map[string]interface{}{"contact": v}, }, ) } } tmp = map[string]interface{}{} if index%100 == 0 { log.Println("buyer index", index) } } log.Println("buyer ok index", index) } //历史数据处理 func historybuyer(db, fromcoll string) { defer qu.Catch() log.Println("history start") sess := MongoFrom.GetMgoConn() defer MongoFrom.Close() it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1, "buyer": 1, "buyertel": 1, "buyerperson": 1, "buyerclass": 1, "topscopeclass": 1}).Sort("_id").Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过 continue } _id := mongo.BsonTOStringId(tmp["_id"]) buyerchanbool <- true go func(tmp map[string]interface{}) { defer func() { <-buyerchanbool }() buyer := qu.ObjToString(tmp["buyer"]) buyerclass := qu.ObjToString(tmp["buyerclass"]) topscopeclass, _ := tmp["topscopeclass"].(primitive.A) if buyer != "" && utf8.RuneCountInString(buyer) > 4 { buyerperson := qu.ObjToString(tmp["buyerperson"]) buyertel := qu.ObjToString(tmp["buyertel"]) b, _ := redis.ExistRedis("buyer", buyerbd, buyer) if b { if buyerperson != "" || buyertel != "" { strs, _ := redis.GetRedisStr("buyer", buyerbd, buyer) ps := []interface{}{} err := json.Unmarshal([]byte(strs), &ps) if err == nil { v := map[string]interface{}{ "contact_person": buyerperson, "phone": buyertel, "buyerclass": buyerclass, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": _id, } ps = append(ps, v) bs, _ := json.Marshal(ps) redis.PutRedis("buyer", buyerbd, buyer, bs, -1) } else { log.Println("jsonErr", err) } } } else { val := []map[string]interface{}{} if buyerperson != "" || buyertel != "" { tmp := map[string]interface{}{ "contact_person": buyerperson, "phone": buyertel, "buyerclass": buyerclass, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": _id, } val = append(val, tmp) } bs, _ := json.Marshal(val) redis.PutRedis("buyer", buyerbd, buyer, bs, -1) MongoTo.Save(buyererr, map[string]interface{}{ "name": buyer, "buyerclass": buyerclass, "updatetime": time.Now().Unix(), }) } } }(tmp) tmp = map[string]interface{}{} if index%10000 == 0 { log.Println("index", index, _id) } } log.Println("history ok index", index) buyerStandarHistory(qu.ObjToString(sysconfig["mgotodb"])) } //查询buyererr标准化历史数据 func buyerStandarHistory(db string) { defer qu.Catch() log.Println("开始标准化数据--buyer", db) sessto := MongoTo.GetMgoConn() defer MongoTo.Close() it := sessto.DB(db).C(buyererr).Find(map[string]interface{}{}).Iter() index := 0 entnum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { err_id := mongo.BsonTOStringId(tmp["_id"]) name := qu.ObjToString(tmp["name"]) buyerchanbool <- true go func(tmp map[string]interface{}) { defer func() { <-buyerchanbool }() strs, err := redis.GetRedisStr("buyer", buyerbd, name) if err != nil { return } ps := []map[string]interface{}{} err = json.Unmarshal([]byte(strs), &ps) if err == nil { data := comHisMegerNewData(name, "buyer", ps) if data != nil { MongoTo.Save(buyerent, data) MongoTo.DeleteById(buyererr, err_id) entnum++ } else { //未查询到企业,打标记并存表 num := comMarkdata(name, "buyer") tmp["check"] = num MongoTo.UpdateById(buyererr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}}) } } else { log.Println("jsonErr", name, err) } }(tmp) if index%1000 == 0 { log.Println("标准化历史数据--buyer", index, err_id, entnum) } tmp = map[string]interface{}{} } log.Println("标准化数据完成--buyer", index, entnum) } //企业数据整合(已有标注信息) func buyerMegerBuyerclass(id string, ps map[string]interface{}) map[string]interface{} { tmp := MongoEnt.FindById(buyerent, id, bson.M{"buyerclass": 1}) if len(tmp) < 1 { return nil } data := map[string]interface{}{} buyerclass := tmp["buyerclass"].(primitive.A) tmpbuyerclass := map[string]bool{} for _, v := range buyerclass { tt := qu.ObjToString(v) tmpbuyerclass[tt] = true } tmpbuyerclass[qu.ObjToString(ps["buyerclass"])] = true newbuyerclass := []interface{}{} for k, _ := range tmpbuyerclass { newbuyerclass = append(newbuyerclass, k) } data["buyerclass"] = newbuyerclass data["updatetime"] = time.Now().Unix() return data }