// standaragency package main import ( "dbutil/mongo" "dbutil/redis" "encoding/json" "log" qu "qfw/util" "time" "unicode/utf8" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/mgo.v2/bson" ) //增量处理 func agencyStandarData(db string, query map[string]interface{}) { defer qu.Catch() sess := MongoFrom.GetMgoConn() defer MongoFrom.Close() it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1, "agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1, "agencyaddr": 1}).Sort("_id").Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过 continue } agency := qu.ObjToString(tmp["agency"]) if utf8.RuneCountInString(agency) < 5 { continue } infoid := mongo.BsonTOStringId(tmp["_id"]) topscopeclass, _ := tmp["topscopeclass"].(primitive.A) entid, _ := redis.GetRedisStr("agency", agencybd, agency) ps := []map[string]interface{}{} agencyperson := qu.ObjToString(tmp["agencyperson"]) agencytel := qu.ObjToString(tmp["agencytel"]) if entid == "" { //redis 未存 savetoerr := true if agencytel != "" { v := map[string]interface{}{ "contact_person": agencyperson, "phone": agencytel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": infoid, } ps = append(ps, v) data := comHisMegerNewData(agency, "agency", ps) if data != nil { _id := MongoTo.Save(agencyent, data) redis.PutRedis("agency", agencybd, agency, _id.(primitive.ObjectID).Hex(), -1) savetoerr = false } } if savetoerr { t := MongoTo.FindOne(agencyerr, map[string]interface{}{"name": agency}) if len(t) < 1 { MongoTo.Save(agencyerr, map[string]interface{}{ "name": agency, "check": comMarkdata(agency, "agency"), "updatetime": time.Now().Unix(), }) } } } else { if agencytel != "" { is_exist := false //电话是否存在 for _, v := range ps { if v["phone"] == agencytel { is_exist = true if agencyperson != "" && v["contact_person"] != agencyperson { v["contact_person"] = agencyperson v["infoid"] = infoid bs, _ := json.Marshal(ps) //替换数据,更新 redis.PutRedis("agency", agencybd, agency, bs, -1) } continue } } if !is_exist { v := map[string]interface{}{ "contact_person": agencyperson, "phone": agencytel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": infoid, } MongoTo.UpdateById(agencyent, entid, map[string]interface{}{ "$set": map[string]interface{}{"updatetime": time.Now().Unix()}, "$push": map[string]interface{}{"contact": v}, }, ) } } } tmp = map[string]interface{}{} if index%100 == 0 { log.Println("agency index", index) } } log.Println("agency ok index", index) } //历史数据处理 func historyagency(db, fromcoll string) { defer qu.Catch() log.Println("history start") sess := MongoFrom.GetMgoConn() defer MongoFrom.Close() it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1, "agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1, "agencyaddr": 1}).Sort("_id").Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过 continue } _id := mongo.BsonTOStringId(tmp["_id"]) agencychanbool <- true go func(tmp map[string]interface{}) { defer func() { <-agencychanbool }() agency := qu.ObjToString(tmp["agency"]) topscopeclass, _ := tmp["topscopeclass"].(primitive.A) if agency != "" && utf8.RuneCountInString(agency) > 4 { agencyperson := qu.ObjToString(tmp["agencyperson"]) agencytel := qu.ObjToString(tmp["agencytel"]) b, _ := redis.ExistRedis("agency", agencybd, agency) if b { //redis 存在 if agencytel != "" { strs, _ := redis.GetRedisStr("agency", agencybd, agency) ps := []map[string]interface{}{} err := json.Unmarshal([]byte(strs), &ps) if err == nil { is_exist := false //电话是否存在 for _, v := range ps { if v["phone"] == agencytel { is_exist = true if agencyperson != "" && v["contact_person"] != agencyperson { v["contact_person"] = agencyperson v["infoid"] = _id bs, _ := json.Marshal(ps) //替换数据,更新 redis.PutRedis("agency", agencybd, agency, bs, -1) } continue } } if !is_exist { v := map[string]interface{}{ "contact_person": agencyperson, "phone": agencytel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": _id, } ps = append(ps, v) bs, _ := json.Marshal(ps) redis.PutRedis("agency", agencybd, agency, bs, -1) } } else { log.Println("jsonErr", err) } } } else { val := []map[string]interface{}{} if agencytel != "" { tmp := map[string]interface{}{ "contact_person": agencyperson, "phone": agencytel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": _id, } val = append(val, tmp) } bs, _ := json.Marshal(val) redis.PutRedis("agency", agencybd, agency, bs, -1) MongoTo.Save(agencyerr, map[string]interface{}{ "name": agency, "updatetime": time.Now().Unix(), }) } } }(tmp) tmp = map[string]interface{}{} if index%10000 == 0 { log.Println("index", index, _id) } } log.Println("history ok index", index) agencyStandarHistory(qu.ObjToString(sysconfig["mgotodb"])) } //查询agencyerr标准化历史数据 func agencyStandarHistory(db string) { defer qu.Catch() log.Println("开始标准化数据--agency", db) sessto := MongoTo.GetMgoConn() defer MongoTo.Close() it := sessto.DB(db).C(agencyerr).Find(map[string]interface{}{}).Iter() index := 0 entnum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { err_id := mongo.BsonTOStringId(tmp["_id"]) name := qu.ObjToString(tmp["name"]) agencychanbool <- true go func(tmp map[string]interface{}) { defer func() { <-agencychanbool }() strs, err := redis.GetRedisStr("agency", agencybd, name) if err != nil { return } ps := []map[string]interface{}{} err = json.Unmarshal([]byte(strs), &ps) if err == nil { data := comHisMegerNewData(name, "agency", ps) if data != nil { MongoTo.Save(agencyent, data) MongoTo.DeleteById(agencyerr, err_id) entnum++ } else { //未查询到企业,打标记并存表 num := comMarkdata(name, "agency") tmp["check"] = num MongoTo.UpdateById(agencyerr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}}) } } else { log.Println("jsonErr", name, err) } }(tmp) if index%1000 == 0 { log.Println("标准化历史数据--agency", index, err_id, entnum) } tmp = map[string]interface{}{} } log.Println("标准化数据完成--agency", index, entnum) }