package main import ( "encoding/json" "fmt" "github.com/garyburd/redigo/redis" "gopkg.in/mgo.v2/bson" "log" util2 "mfw/util" "net" "qfw/util" "sort" "strings" "time" "unicode/utf8" ) //之前main方法,只更新 func TaskBuyer(mapinfo *map[string]interface{}) { defer util.Catch() gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"]) if gtid == "" || lteid == "" { log.Println(gtid, lteid, "参数错误") return } var GId, LtId bson.ObjectId if bson.IsObjectIdHex(gtid) && bson.IsObjectIdHex(lteid) { GId = bson.ObjectIdHex(gtid) LtId = bson.ObjectIdHex(lteid) } else { log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid) return } //timenow:=time.Now().Unix() //udp的id区间查询bidding 中标人 中标联系人 中标联系电话 // topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型 // (area地区-province省份 city城市-city城市 district区县-district区县) // buyeraddr-company_address企业地址 SourceClientcc := SourceClient.GetMgoConn(8640000) cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{ "_id": bson.M{ "$gte": GId, "$lte": LtId, }, }).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1, "topscopeclass": 1, "buyeraddr": 1, "buyerclass": 1}).Iter() if cursor.Err() != nil { SourceClient.DestoryMongoConn(SourceClientcc) log.Println(cursor.Err()) return } //判断是否是存量,是存量走Redis遍历 if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" { //存量处理 conn := HisRedisPool.Conn() defer conn.Close() //选择redis db conn.Select(redis_buyer_db) //遍历bidding表保存到redis //key:企业名 value:json结构体{"buyer": 1, "buyertel": 1, "buyerperson": 1,"topscopeclass": 1, "buyeraddr": 1,"_id":1} tmp := make(map[string]interface{}) var num int var tmpRangeId string for cursor.Next(&tmp) { num++ if num%10000==0 &&num>0{ log.Println("当前遍历数量:",num) } mgoId := tmp["_id"].(bson.ObjectId).Hex() tmpRangeId = mgoId buyer, ok := tmp["buyer"].(string) if !ok || utf8.RuneCountInString(buyer) < 4 { continue } //判断redis key是否存在 e_num := conn.Exists(buyer).Val() //获取字符串_id //替换_id tmp["_id"] = mgoId //创建value数组 tmps := make([]map[string]interface{}, 0) if e_num > 0 { //存量redis的key存在,累加更新 bytes, _ := conn.Get(buyer).Bytes() json.Unmarshal(bytes, &tmps) } tmps = append(tmps, tmp) bytes, _ := json.Marshal(tmps) //存量redis的key不存在,新增 key :企业名 val :[]map if err := conn.Set(buyer, string(bytes), 0).Err(); err != nil { log.Println(err) } } log.Println("存量buyer mongo遍历完成:",num) if tmpRangeId != lteid{ by, _ := json.Marshal(map[string]interface{}{ "gtid": tmpRangeId, "lteid": lteid, "data_info":"save", "stype": "buyer", }) if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: Updport, }); e != nil { log.Println(e) } SourceClient.DestoryMongoConn(SourceClientcc) return } SourceClient.DestoryMongoConn(SourceClientcc) //遍历redis if scan := conn.Scan(0, "", 100); scan.Err() != nil { log.Println(scan.Err()) return } else { iterator := scan.Iterator() for iterator.Next() { redisCName := iterator.Val() //redis key 企业名 redisvalueBytes, _ := conn.Get(redisCName).Bytes() //redis val []数组 rValuesMaps := make([]map[string]interface{}, 0) json.Unmarshal(redisvalueBytes, &rValuesMaps) //redis查询是否存在 rdb := RedisPool.Get() rdb.Do("SELECT", redis_buyer_db) if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil { //redis不存在,存到临时表,定时任务处理 for _, vmap := range rValuesMaps { vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string)) if errb := FClient.SaveByOriID(Config["mgo_qyk_c_b_new"], vmap); !errb { log.Println("存量 FClient.Save err", errb, vmap) } } //log.Println("get redis id err:定时任务处理", err, tmp) if err := rdb.Close(); err != nil { log.Println("存量", err) } continue } else { //redis存在更新合并 if err := rdb.Close(); err != nil { log.Println(err) } //拿到合并后的qyk oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, nil) if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{ log.Println(redisCName, "存量 redis id 不存在", reply,"数据:",oldTmp) continue } tmpTopscopeclass := []string{} tmpTopscopeclassMap := make(map[string]bool) for _, rvaluemaps := range rValuesMaps { if tclasss, ok := rvaluemaps["topscopeclass"].([]string); ok { for _, vv := range tclasss { if len(vv) > 1 { tmpTopscopeclassMap[vv[:len(vv)-1]] = true } } } } for k := range tmpTopscopeclassMap { tmpTopscopeclass = append(tmpTopscopeclass, k) } sort.Strings(tmpTopscopeclass) esId := (*oldTmp)["_id"].(bson.ObjectId).Hex() //更新buyerclass合并 if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" { //无值,不更新 } else { var buyerclass_new, buyerclass_old string buyerclass_new = tmp["buyerclass"].(string) buyerclass_old = (*oldTmp)["buyerclass"].(string) if buyerclass_old == "" { (*oldTmp)["buyerclass"] = buyerclass_new } else { if buyerclass_new != buyerclass_old { if !strings.Contains(buyerclass_old, buyerclass_new) { (*oldTmp)["buyerclass"] = buyerclass_old + "," + buyerclass_new //采购单位类型 } } } } //联系方式合并 contactMaps := make([]interface{}, 0) if (*oldTmp)["contact"] != nil { //直接添加联系人,不再判断 if v, ok := (*oldTmp)["contact"].([]interface{}); ok { contactMaps = append(contactMaps, v...) } } //遍历redis value联系人 for _, rvmap := range rValuesMaps { var tmpperson, buyertel string if rvmapperson, ok := rvmap["buyerperson"].(string); ok && utf8.RuneCountInString(rvmapperson)>=2 && rvmapperson != "" { tmpperson = rvmapperson } else { continue } if rvmapwintel, ok := rvmap["buyertel"].(string); ok { buyertel = rvmapwintel } else { buyertel = "" } if Reg_xing.MatchString(buyertel) || !Reg_tel.MatchString(buyertel) { buyertel = "" } tmpContact := make(map[string]interface{}) tmpContact["infoid"] = rvmap["_id"] tmpContact["contact_person"] = tmpperson tmpContact["contact_type"] = "项目联系人" tmpContact["phone"] = buyertel tmpclass := make([]string, 0) if tclasss, ok := rvmap["topscopeclass"].([]string); ok { for _, vv := range tclasss { if len(vv) > 1 { tmpclass = append(tmpclass, vv[:len(vv)-1]) } } } tmpContact["topscopeclass"] = strings.Join(tmpclass, ";") tmpContact["updatetime"] = time.Now().Unix() contactMaps = append(contactMaps, tmpContact) } (*oldTmp)["contact"] = contactMaps //mongo更新 (*oldTmp)["updatatime"] = time.Now().Unix() if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) { log.Println("存量 mongo更新 err", esId, oldTmp) } //es更新 delete((*oldTmp), "_id") } } } log.Println("存量历史合并执行完成 ok", gtid, lteid) } else { //增量处理 overid := gtid tmp := map[string]interface{}{} for cursor.Next(&tmp) { overid = AddBuyer(overid, tmp) } SourceClient.DestoryMongoConn(SourceClientcc) log.Println("增量合并执行完成 ok", gtid, lteid, overid) //发送udp 更新es段 //nextNode("buyerent",timenow) } } //增量 func AddBuyer(overid string, tmp map[string]interface{}) string { overid = tmp["_id"].(bson.ObjectId).Hex() buyer, ok := tmp["buyer"].(string) if !ok || utf8.RuneCountInString(buyer) < 4 { return overid } //redis查询是否存在 rdb := RedisPool.Get() rdb.Do("SELECT", redis_buyer_db) if reply, err := redis.String(rdb.Do("GET", buyer)); err != nil { //redis不存在存到临时表,定时任务处理 if errb := FClient.SaveByOriID(Config["mgo_qyk_c_b_new"], tmp); !errb { log.Println("FClient.Save err", errb, tmp) } //log.Println("get redis id err:定时任务处理", err, tmp) if err := rdb.Close(); err != nil { log.Println(err) } return overid } else { if err := rdb.Close(); err != nil { log.Println(err) } //拿到合并后的qyk oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, bson.M{}) if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil { log.Println("redis id 不存在", reply) return overid } //比较合并 行业类型 tmpTopscopeclass := []string{} tmpConTopscopeclass := []string{} tmpTopscopeclassMap := make(map[string]bool) if v, ok := tmp["topscopeclass"].([]interface{}); ok { for _, vv := range v { if vvv, ok := vv.(string); ok && len(vvv) > 1 { tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1]) } } } for k := range tmpTopscopeclassMap { tmpTopscopeclass = append(tmpTopscopeclass, k) } sort.Strings(tmpTopscopeclass) esId := (*oldTmp)["_id"].(bson.ObjectId).Hex() //更新buyerclass合并 if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" { //无值,不更新 } else { var buyerclass_new, buyerclass_old string buyerclass_new = tmp["buyerclass"].(string) buyerclass_old = (*oldTmp)["buyerclass"].(string) if buyerclass_old == "" { (*oldTmp)["buyerclass"] = buyerclass_new } else { if buyerclass_new != buyerclass_old { if !strings.Contains(buyerclass_old, buyerclass_new) { (*oldTmp)["buyerclass"] = buyerclass_old + "," + buyerclass_new //采购单位类型 } } } } //更新行业类型 if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) { (*oldTmp)["updatatime"] = time.Now().Unix() //mongo更新 if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) { log.Println("mongo更新err", esId) } //es更新 delete((*oldTmp), "_id") return overid } //联系方式合并 contactMaps := make([]map[string]interface{}, 0) if (*oldTmp)["contact"] != nil { //直接添加联系人,不再判断 if v, ok := (*oldTmp)["contact"].([]interface{}); ok { for _, vv := range v { contactMaps = append(contactMaps, vv.(map[string]interface{})) } } } var tmpperson, buyertel string if tmppersona, ok := tmp["buyerperson"].(string); ok && utf8.RuneCountInString(tmppersona)>=2 && tmppersona != "" && Reg_person.MatchString(tmppersona) && !Reg_xing.MatchString(tmppersona) { tmpperson = tmppersona } if tmpperson != "" { if buyerteltmp, ok := tmp["buyertel"].(string); ok { buyertel = buyerteltmp } if Reg_xing.MatchString(buyertel) || !Reg_tel.MatchString(buyertel) { buyertel = "" } else { buyertel = buyertel } vvv := make(map[string]interface{}) vvv["infoid"] = overid vvv["contact_person"] = tmpperson vvv["contact_type"] = "项目联系人" vvv["phone"] = buyertel vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";") vvv["updatetime"] = time.Now().Unix() contactMaps = append(contactMaps, vvv) } //分包处理 if tmp["package"] != nil { PackageDealWithBuyer(oldTmp, tmp, buyer) } (*oldTmp)["contact"] = contactMaps //mongo更新 (*oldTmp)["updatatime"] = time.Now().Unix() if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) { log.Println("mongo更新 err", esId, oldTmp) } //es更新 delete((*oldTmp), "_id") } return overid } //定时任务 新增 //1.存异常表 //2.合并原始库新增 func TimedTaskBuyer() { //time.Sleep(time.Hour*70) t2 := time.NewTimer(time.Second * 5) for range t2.C { //timenow := time.Now().Unix() Fcconn := FClient.GetMgoConn(86400) tmpLast := map[string]interface{}{} if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_b_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil { if !iter.Next(&tmpLast) { //临时表无数据 log.Println("临时表无数据:") t2.Reset(time.Minute * 5) FClient.DestoryMongoConn(Fcconn) continue } else { log.Println("临时表有数据:", tmpLast["_id"]) fconn := FClient.GetMgoConn(86400) cursor := fconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_b_new"]).Find(bson.M{ "_id": bson.M{ "$lte": tmpLast["_id"], }, }).Sort("_id").Iter() if cursor == nil { log.Println("查询失败") t2.Reset(time.Second * 5) FClient.DestoryMongoConn(fconn) continue } //遍历临时表数据,匹配不到原始库存入异常表 tmp := make(map[string]interface{}) for cursor.Next(&tmp) { tmpId := tmp["_id"].(bson.ObjectId).Hex() errbuyer, ok := tmp["buyer"].(string) if !ok || errbuyer == "" { continue } //再重新查找redis,存在发udp处理,不存在走新增合并 rdb := RedisPool.Get() rdb.Do("SELECT", redis_buyer_db) if _, err := redis.String(rdb.Do("GET", errbuyer)); err == nil { //增量合并 AddBuyer(tmpId, tmp) //存在的话删除tmp mongo表 if DeletedCount := FClient.Del(Config["mgo_qyk_c_b_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount { log.Println("删除临时表err:", DeletedCount) } if err := rdb.Close(); err != nil { log.Println(err) } continue } else { if err = rdb.Close(); err != nil { log.Println(err) } } //查询redis不存在新增 sessionfinone := FClient.GetMgoConn() resulttmp := make(map[string]interface{}) err := sessionfinone.DB(Config["mgodb_enterprise"]).C(Config["mgodb_enterprise_c"]).Find(bson.M{"company_name": errbuyer}).One(&resulttmp) FClient.DestoryMongoConn(sessionfinone) if err != nil || resulttmp["_id"] == nil { //log.Println(r) //人工审核正则 var isok bool //先遍历ok for _, v := range BuyerRegOk { isok = v.MatchString(errbuyer) if isok { //匹配ok完,匹配err for _, vRegErr := range BuyerRegErr { isok = vRegErr.MatchString(errbuyer) //匹配到ok 也匹配到err 按err算 if isok { tmp["buyer_err"] = 1 break } } //匹配ok,没匹配err 按ok算 if tmp["buyer_err"] == nil { tmp["buyer_ok"] = 1 break } } } //都没匹配 if tmp["buyer_ok"] == nil && tmp["bnuyer_err"] == nil { tmp["buyer_err"] = 1 } //匹配不到原始库,存入异常表删除临时表 if errb := FClient.SaveByOriID(Config["mgo_qyk_c_b_err"], tmp); !errb{ log.Println("存入异常表错误", errb, tmp) } if deleteNum := FClient.Del(Config["mgo_qyk_c_b_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum { log.Println("删除临时表错误", deleteNum) } continue } else { //log.Println(123) //匹配到原始库,新增 resulttmp buyer if resulttmp["credit_no"] != nil { if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" && len(strings.TrimSpace(credit_no)) > 8 { dataNo := strings.TrimSpace(credit_no)[2:8] if Addrs[dataNo] != nil { if v, ok := Addrs[dataNo].(map[string]interface{}); ok { if resulttmp["province"] == nil || resulttmp["province"] == "" { resulttmp["province"] = v["province"] } resulttmp["city"] = v["city"] resulttmp["district"] = v["district"] } } } } //行业类型 tmpclass := make([]string, 0) if tclasss, ok := tmp["topscopeclass"].([]interface{}); ok { for _, vv := range tclasss { if vvv, ok := vv.(string); ok { if len(vvv) > 1 { tmpclass = append(tmpclass, vvv[:len(vvv)-1]) } } } } contacts := make([]map[string]interface{}, 0) if legal_person, ok := resulttmp["legal_person"].(string); ok && utf8.RuneCountInString(legal_person)>=2 &&legal_person != "" && !Reg_xing.MatchString(legal_person) && Reg_person.MatchString(legal_person) { contact := make(map[string]interface{}, 0) contact["contact_person"] = legal_person //联系人 contact["contact_type"] = "法定代表人" //法定代表人 if resulttmp["annual_reports"] != nil { bytes, err := json.Marshal(resulttmp["annual_reports"]) if err != nil { log.Println("annual_reports err:", err) } phonetmp := make([]map[string]interface{}, 0) err = json.Unmarshal(bytes, &phonetmp) if err != nil { log.Println("Unmarshal err:", err) } for _, vv := range phonetmp { if vv["company_phone"] != nil { if vv["company_phone"] == "" { continue } else { contact["phone"] = vv["company_phone"] //联系电话 break } } else { contact["phone"] = "" //联系电话 } } } //log.Println(k, contact["phone"], resulttmp["_id"]) //time.Sleep(10 * time.Second) if phone, ok := contact["phone"].(string); ok && phone != "" { if Reg_xing.MatchString(phone) || !Reg_tel.MatchString(phone) { contact["phone"] = "" //联系电话 } } else { contact["phone"] = "" //联系电话 } contact["topscopeclass"] = "企业公示" //项目类型 contact["updatetime"] = time.Now().Unix() //更新时间 contact["infoid"] = "" //招标信息id contacts = append(contacts, contact) } //添加临时表匹配到的联系人 if buyerperson, ok := tmp["buyerperson"].(string); ok &&utf8.RuneCountInString(buyerperson)>=2 && buyerperson != "" && !Reg_xing.MatchString(buyerperson) && Reg_person.MatchString(buyerperson) { vvv := make(map[string]interface{}) vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex() vvv["contact_person"] = buyerperson vvv["contact_type"] = "项目联系人" if buyertel, ok := tmp["buyertel"].(string); ok && !Reg_xing.MatchString(buyertel) && Reg_tel.MatchString(buyertel) { vvv["phone"] = buyertel } else { vvv["phone"] = "" } vvv["topscopeclass"] = strings.Join(tmpclass, ";") vvv["updatetime"] = time.Now().Unix() contacts = append(contacts, vvv) } savetmp := make(map[string]interface{}, 0) for _, sk := range BuyerFields { if sk == "_id" { savetmp["tmp"+sk] = resulttmp[sk] continue } else if sk == "area_code" { //行政区划代码 savetmp[sk] = fmt.Sprint(resulttmp[sk]) continue } else if sk == "report_websites" { //网址 if resulttmp["report_websites"] == nil { savetmp["website"] = "" } else { report_websitesArr := []string{} if ppms, ok := resulttmp[sk].([]interface{}); ok { for _, v := range ppms { if vvv, ok := v.(map[string]interface{}); ok { if rv, ok := vvv["website_url"].(string); ok { report_websitesArr = append(report_websitesArr, rv) } } } } sort.Strings(report_websitesArr) savetmp["website"] = strings.Join(report_websitesArr, ";") } continue } else if sk == "wechat_accounts" { savetmp[sk] = []interface{}{} continue } else if sk == "buyer_name" { if resulttmp["company_name"] == nil { savetmp[sk] = "" } else { savetmp[sk] = resulttmp["company_name"] } continue } else if sk == "address" { if resulttmp["company_address"] == nil { savetmp[sk] = "" } else { savetmp[sk] = resulttmp["company_address"] } continue }else if sk == "buyerclass" { if tmp["buyerclass"]==nil { savetmp[sk] = "" }else { savetmp[sk] = tmp["buyerclass"] } continue } if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "buyer_name" && sk != "address" && sk != "buyerclass"&& sk != "contact" && sk != "report_websites" { savetmp[sk] = "" } else { savetmp[sk] = resulttmp[sk] } } //判断分包 if tmp["package"] != nil { PackageDealWithBuyer(&savetmp, tmp, errbuyer) } //tmps = append(tmps, savetmp) savetmp["comeintime"] = time.Now().Unix() savetmp["updatatime"] = time.Now().Unix() //保存mongo saveid := FClient.Save(Config["mgo_qyk_buyer"], savetmp) if saveid != "" { //保存redis rc := RedisPool.Get() rc.Do("SELECT", redis_buyer_db) if _, err := rc.Do("SET", savetmp["buyer_name"], saveid); err != nil { log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["buyer_name"], err) } else { //删除临时表 if deleteNum := FClient.Del(Config["mgo_qyk_c_b_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum { log.Println("删除临时表失败", deleteNum) } } if err := rc.Close(); err != nil { log.Println(err) } } else { log.Println("save mongo err:", saveid, tmp["_id"]) } } } FClient.DestoryMongoConn(fconn) log.Println("buyer_new,遍历完成") } } FClient.DestoryMongoConn(Fcconn) t2.Reset(time.Minute) //nextNode("buyerent",timenow) } } //分包处理 func PackageDealWithBuyer(contactMap *map[string]interface{}, tmp map[string]interface{}, comName string) []interface{} { util.Catch() //if v, ok := tmp["package"].(map[string]interface{}); ok { //for i, pv := range v { // log.Println(i, pv) //} //} return nil }