// standarwinner package main import ( "dbutil/mongo" "dbutil/redis" "encoding/json" "log" qu "qfw/util" "strings" "time" "unicode/utf8" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/mgo.v2/bson" ) //增量处理 func winnerStandarData(db string, query map[string]interface{}) { defer qu.Catch() sess := MongoFrom.GetMgoConn() defer MongoFrom.Close() it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1, "winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1, "package": 1}).Sort("_id").Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过 continue } winner := qu.ObjToString(tmp["winner"]) if utf8.RuneCountInString(winner) < 5 { continue } infoid := mongo.BsonTOStringId(tmp["_id"]) topscopeclass, _ := tmp["topscopeclass"].(primitive.A) entid, _ := redis.GetRedisStr("winner", winnerbd, winner) winnerperson := qu.ObjToString(tmp["winnerperson"]) winnertel := qu.ObjToString(tmp["winnertel"]) if entid == "" { //新增标准库 savetoerr := true if winnerperson != "" || winnertel != "" { v := map[string]interface{}{ "contact_person": winnerperson, "phone": winnertel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": infoid, } data := comHisMegerNewData(winner, "winner", []map[string]interface{}{v}) if data != nil { _id := MongoTo.Save(winnerent, data) redis.PutRedis("winner", winnerbd, winner, _id.(primitive.ObjectID).Hex(), -1) savetoerr = false } } if savetoerr { t := MongoTo.FindOne(winnererr, map[string]interface{}{"name": winner}) if len(t) < 1 { MongoTo.Save(winnererr, map[string]interface{}{ "name": winner, "topscopeclass": comRepTopscopeclass(topscopeclass), "check": comMarkdata(winner, "winner"), "updatetime": time.Now().Unix(), }) } } } else { //更新标准库 if winnerperson != "" || winnertel != "" { v := map[string]interface{}{ "contact_person": winnerperson, "phone": winnertel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": infoid, } data := winMegerIndustry(entid, v) MongoTo.UpdateById(winnerent, entid, map[string]interface{}{ "$set": data, "$push": map[string]interface{}{"contact": v}, }, ) } } //分包处理 if packages, ok := tmp["package"].(map[string]interface{}); ok { entpacks := getWinnerPacks(infoid, packages, comRepTopscopeclass(topscopeclass)) for name, contact := range entpacks { entid, _ := redis.GetRedisStr("winner", winnerbd, name) if entid == "" { data := comHisMegerNewData(winner, "winner", []map[string]interface{}{contact}) if data != nil { _id := MongoTo.Save(winnerent, data) redis.PutRedis("winner", winnerbd, winner, _id.(primitive.ObjectID).Hex(), -1) } } else { data := winMegerIndustry(entid, contact) MongoTo.UpdateById(winnerent, entid, map[string]interface{}{ "$set": data, "$push": map[string]interface{}{"contact": contact}, }, ) } } } tmp = map[string]interface{}{} if index%100 == 0 { log.Println("winner index", index) } } log.Println("winner ok index", index) } //历史数据处理 func historywinner(db, fromcoll string) { defer qu.Catch() log.Println("history start") sess := MongoFrom.GetMgoConn() defer MongoFrom.Close() it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1, "winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1}).Sort("_id").Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过 continue } _id := mongo.BsonTOStringId(tmp["_id"]) winchanbool <- true go func(tmp map[string]interface{}) { defer func() { <-winchanbool }() winner := qu.ObjToString(tmp["winner"]) topscopeclass, _ := tmp["topscopeclass"].(primitive.A) if winner != "" && utf8.RuneCountInString(winner) > 4 { winnerperson := qu.ObjToString(tmp["winnerperson"]) winnertel := qu.ObjToString(tmp["winnertel"]) b, _ := redis.ExistRedis("winner", winnerbd, winner) if b { if winnerperson != "" || winnertel != "" { strs, _ := redis.GetRedisStr("winner", winnerbd, winner) ps := []interface{}{} err := json.Unmarshal([]byte(strs), &ps) if err == nil { v := map[string]interface{}{ "contact_person": winnerperson, "phone": winnertel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": _id, } ps = append(ps, v) bs, _ := json.Marshal(ps) redis.PutRedis("winner", winnerbd, winner, bs, -1) //log.Println(_id, index, winner) } else { log.Println("jsonErr", err) } } } else { val := []map[string]interface{}{} if winnerperson != "" || winnertel != "" { tmp := map[string]interface{}{ "contact_person": winnerperson, "phone": winnertel, "topscopeclass": comRepTopscopeclass(topscopeclass), "infoid": _id, } val = append(val, tmp) } bs, _ := json.Marshal(val) redis.PutRedis("winner", winnerbd, winner, bs, -1) MongoTo.Save(winnererr, map[string]interface{}{ "name": winner, "topscopeclass": comRepTopscopeclass(topscopeclass), "updatetime": time.Now().Unix(), }) } } }(tmp) tmp = map[string]interface{}{} if index%10000 == 0 { log.Println("index", index, _id) } } log.Println("history ok index", index) winStandarHistory(qu.ObjToString(sysconfig["mgotodb"])) } //查询winnererr标准化历史数据 func winStandarHistory(db string) { defer qu.Catch() log.Println("开始标准化数据--winner", db) sessto := MongoTo.GetMgoConn() defer MongoTo.Close() it := sessto.DB(db).C(winnererr).Find(map[string]interface{}{}).Iter() index := 0 entnum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { err_id := mongo.BsonTOStringId(tmp["_id"]) name := qu.ObjToString(tmp["name"]) winchanbool <- true go func(tmp map[string]interface{}) { defer func() { <-winchanbool }() strs, err := redis.GetRedisStr("winner", winnerbd, name) if err != nil { return } ps := []map[string]interface{}{} err = json.Unmarshal([]byte(strs), &ps) if err == nil { data := comHisMegerNewData(name, "winner", ps) if data != nil { MongoTo.Save(winnerent, data) MongoTo.DeleteById(winnererr, err_id) entnum++ } else { //未查询到企业,打标记并存表 num := comMarkdata(name, "winner") tmp["check"] = num MongoTo.UpdateById(winnererr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}}) } } else { log.Println("jsonErr", name, err) } }(tmp) if index%1000 == 0 { log.Println("标准化历史数据--winner", index, err_id, entnum) } tmp = map[string]interface{}{} } log.Println("标准化数据完成--winner", index, entnum) } //企业数据整合(已有标注信息) func winMegerIndustry(id string, ps map[string]interface{}) map[string]interface{} { tmp := MongoEnt.FindById(winnerent, id, bson.M{"industry": 1}) if len(tmp) < 1 { return nil } data := map[string]interface{}{} industry := tmp["industry"].(primitive.A) tmpindustry := map[string]bool{} for _, v := range industry { tt := qu.ObjToString(v) tmpindustry[tt] = true } if topscopeclass, ok := ps["topscopeclass"].([]interface{}); ok { for _, v := range topscopeclass { tt := qu.ObjToString(v) tmpindustry[tt] = true } } newindustry := []interface{}{} for k, _ := range tmpindustry { newindustry = append(newindustry, k) } data["industry"] = newindustry return data } //中标单位分包提取联系方式 func getWinnerPacks(infoid string, packs map[string]interface{}, topscopeclass []interface{}) map[string]map[string]interface{} { entmappacks := map[string]map[string]interface{}{} for _, v := range packs { if tmp, ok := v.(map[string]interface{}); ok { winner := qu.ObjToString(tmp["winner"]) if utf8.RuneCountInString(winner) < 5 { continue } winnerperson := qu.ObjToString(tmp["winnerperson"]) winnertel := qu.ObjToString(tmp["winnertel"]) if winnerperson != "" || winnertel != "" { p := map[string]interface{}{ "contact_person": winnerperson, "phone": winnertel, "topscopeclass": topscopeclass, "infoid": infoid, "extfrom": "package", } entmappacks[winner] = p } } } return entmappacks } //数据整合 func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[string]interface{} { tmp := MongoEnt.FindOne("qyxy", map[string]interface{}{"company_name": name}) if len(tmp) < 1 { return nil } data := map[string]interface{}{ "history_name": "", "credit_no": "", "area_code": qu.ObjToString(tmp["area_code"]), "province": qu.ObjToString(tmp["province"]), "city": "", "district": "", "company_type": qu.ObjToString(tmp["company_type"]), "legal_person": qu.ObjToString(tmp["legal_person"]), "company_address": qu.ObjToString(tmp["company_address"]), "business_scope": qu.ObjToString(tmp["business_scope"]), "wechat_accounts": []interface{}{}, "website": "", "contact": ps, "comeintime": time.Now().Unix(), "updatetime": time.Now().Unix(), } //统一信用代码 credit_no := strings.TrimSpace(qu.ObjToString(tmp["credit_no"])) if credit_no != "" { data["credit_no"] = credit_no if len(credit_no) > 8 { dataNo := credit_no[2:8] if Addrs[dataNo] != nil { if v, ok := Addrs[dataNo].(map[string]interface{}); ok { if data["province"] == "" { data["province"] = v["province"] } data["city"] = v["city"] data["district"] = v["district"] } } } } //网址 annual_reports := tmp["annual_reports"] if annual_reports != nil { report_websitesArr := []string{} if anreports, ok := annual_reports.([]interface{}); ok { for _, report_websites := range anreports { if websites, ok := report_websites.([]interface{}); ok { for _, website := range websites { if rv, ok := website.(map[string]interface{}); ok { web := qu.ObjToString(rv["website_url"]) if web != "" { report_websitesArr = append(report_websitesArr, web) } } } } } } if len(report_websitesArr) > 0 { data["website"] = strings.Join(report_websitesArr, ";") } } if datatype == "winner" { data["company_name"] = name data["partners"] = tmp["partners"] establish_date := tmp["establish_date"] if establish_date != nil { data["establish_date"] = qu.Int64All(establish_date) / 1000 } capital := tmp["capital"] if capital != nil { data["capital"] = ObjToMoney([]interface{}{capital, ""})[0] } industry := make([]string, 0) tmpindustry := map[string]bool{} for _, p := range ps { if ts, ok := (p["topscopeclass"]).([]interface{}); ok { for _, v := range ts { tt := qu.ObjToString(v) tmpindustry[tt] = true } } } for k, _ := range tmpindustry { industry = append(industry, k) } data["industry"] = industry } else if datatype == "buyer" { data["buyer_name"] = name tmpbuyerclass := map[string]bool{} for _, p := range ps { tmpbuyerclass[qu.ObjToString(p["buyerclass"])] = true } buyerclass := []interface{}{} for k, _ := range tmpbuyerclass { buyerclass = append(buyerclass, k) } data["buyerclass"] = buyerclass data["ranks"] = "" data["type"] = "" data["address"] = "" } else { data["agency_name"] = name data["ranks"] = "" data["type"] = "" data["address"] = "" } return data } //根据规则数据打标记 func comMarkdata(name, datatype string) int { tag := 0 //默认错误 switch datatype { case "winner": for _, v := range WinnerRegOk { isok := v.MatchString(name) if isok { //匹配ok完,匹配err errflag := true for _, vRegErr := range WinnerRegErr { err := vRegErr.MatchString(name) if err { errflag = true break } } if errflag { tag = 1 } } } case "buyer": for _, v := range BuyerRegOk { isok := v.MatchString(name) if isok { //匹配ok完,匹配err errflag := true for _, vRegErr := range BuyerRegErr { err := vRegErr.MatchString(name) if err { errflag = true break } } if errflag { tag = 1 } } } case "agency": for _, v := range AgencyRegOk { isok := v.MatchString(name) if isok { //匹配ok完,匹配err errflag := true for _, vRegErr := range AgencyRegErr { err := vRegErr.MatchString(name) if err { errflag = true break } } if errflag { tag = 1 } } } default: } return tag } //过滤行业冗余字符 func comRepTopscopeclass(tops []interface{}) []interface{} { data := []interface{}{} for _, v := range tops { tt := qu.ObjToString(v) if len(tt) > 1 { data = append(data, tt[:len(tt)-1]) } } return data } // func comUpdateErr(coll, name string, tclass []interface{}) { if len(tclass) < 1 { return } tmp := MongoTo.FindOne(coll, map[string]interface{}{"name": name}) topscopeclass := tmp["topscopeclass"].(primitive.A) tmpclass := map[string]bool{} for _, tc := range topscopeclass { tmpclass[qu.ObjToString(tc)] = true } oldlen := len(tmpclass) for _, tc := range tclass { tmpclass[qu.ObjToString(tc)] = true } newlen := len(tmpclass) if oldlen == newlen { return } newclass := []interface{}{} for _, v := range tmpclass { newclass = append(newclass, v) } MongoTo.Update(coll, map[string]interface{}{"name": name}, map[string]interface{}{ "$set": map[string]interface{}{ "name": name, "topscopeclass": newclass, "updatetime": time.Now().Unix(), }, }) }