package main import ( "fmt" "go.mongodb.org/mongo-driver/bson" "regexp" "strings" "sync" util "utils" "utils/mongodb" ) func (t *TaskInfo) biddingAllTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() var mpool = make(chan bool, t.thread) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { idMap := q["_id"].(map[string]interface{}) tmpQ := map[string]interface{}{} for c, id := range idMap { if idStr, ok := id.(string); ok && id != "" { tmpQ[c] = mongodb.StringTOBsonId(idStr) } } q["_id"] = tmpQ } //bidding库 biddingConn := biddingMgo.GetMgoConn() defer biddingMgo.DestoryMongoConn(biddingConn) //extract库 extractConn := extractMgo.GetMgoConn() defer extractMgo.DestoryMongoConn(extractConn) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } else { currentColl = c } extractc, _ := extract["collect"].(string) count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count() //线程池 UpdatesLock := sync.Mutex{} util.Debug("查询语句:", q, "同步总数:", count) //查询招标数据 query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, "publishdept": 0, // 6.30 迭代报错,字段值乱码 }).Sort("_id").Iter() //查询抽取结果 extractResult := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter() n := 0 //对比两张表数据,减少查询次数 var compare map[string]interface{} bnil := false for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } update := map[string]interface{}{} del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段 //对比方法---------------- for { if compare == nil { compare = make(map[string]interface{}) if !extractResult.Next(compare) { break } } if compare != nil { //对比 cid := mongodb.BsonIdToSId(compare["_id"]) tid := mongodb.BsonIdToSId(tmp["_id"]) if cid == tid { bnil = false //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k, _ := range tmpmodifyinfo { modifyinfo[k] = true } } for _, k := range biddingMgoFields { //fields更新到mongo的字段 v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil && !modifyinfo[k] { update[k] = v1 } else if v2 != nil && v1 == nil && !modifyinfo[k] { // if k == "s_subscopeclass" && del["subscopeclass"] == nil { continue } else if k == "s_topscopeclass" && del["topscopeclass"] == nil { continue } del[k] = 1 //qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2) } } if util.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } break } else { if cid < tid { bnil = false compare = nil continue } else { bnil = true break } } } else { bnil = false break } } //下面可以多线程跑的---> //处理分类 mpool <- true _id := tmp["_id"] go func(tmp, update, compare, del map[string]interface{}, bnil bool) { defer func() { <-mpool }() if !bnil && compare != nil && len(compare) > 0 { FieldMethod(compare, update) compare = nil } else { area := util.ObjToString(tmp["area"]) city := util.ObjToString(tmp["city"]) district := util.ObjToString(tmp["district"]) UpdatesLock.Lock() rdata := standardCheckCity(area, city, district) UpdatesLock.Unlock() if len(rdata) > 0 { for k, v := range rdata { update[k] = v } } } //------------------对比结束 //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } if tmp["s_winner"] != "" { cid := FieldFun(tmp) if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid tmp_up := []map[string]interface{}{} tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]}) tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}}) updateExtractPool <- tmp_up } } clearMap(tmp) //go IS.Add("bidding") UpdatesLock.Lock() if util.IntAll(update["extracttype"]) != -1 { newTmp := GetEsField(tmp, update, t.stype) saveEsPool <- newTmp } if len(update) > 0 { delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除 if len(del) > 0 { //删除的数据 updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update, "$unset": del}, } } else { updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } } UpdatesLock.Unlock() }(tmp, update, compare, del, bnil) if n%20000 == 0 { util.Debug("current:", n, _id) } tmp = make(map[string]interface{}) } util.Debug(mapInfo, "create bidding index...over", n) } //城市标准校验 func standardCheckCity(area string, city string, district string) map[string]string { rdata := make(map[string]string) if area == "香港" || area == "澳门" || area == "台湾" || (area == "全国" && (city == "" && district == "")) { return rdata } //第一步:区校验 if district != "" { districtArr := DistrictDict[district] if districtArr == nil { //涉及了 个别别名相关的数据 trim_arr := aliasDataDistrict(district) //拆分后缀 if len(trim_arr) > 0 { for _, alias_district := range trim_arr { alias_districtArr := DistrictDict[alias_district] for _, v := range alias_districtArr { if city == v.C_Name && area == v.P_Name { rdata["district"] = alias_district return rdata } } } } rdata["district"] = "" } else { isTrue := false for _, v := range districtArr { if city == v.C_Name && area == v.P_Name { isTrue = true break } } if isTrue { //完全匹配 return rdata } else { //未完全匹配 if len(districtArr) == 1 { rdata["area"] = districtArr[0].P_Name rdata["city"] = districtArr[0].C_Name rdata["district"] = districtArr[0].D_Name return rdata } else { rdata["district"] = "" } } } } //第二步:区校验-失败 市-校验 if city != "" { cityArr := CityDict[city] if cityArr == nil { //把市当成区,匹配三级 - 存在优化空间- city:郑州 别名 districtArr := DistrictDict[city] for _, v := range districtArr { if city == v.C_Name && area == v.P_Name { rdata["area"] = districtArr[0].P_Name rdata["city"] = districtArr[0].C_Name rdata["district"] = districtArr[0].D_Name return rdata } } rdata["city"] = "" } else { isTrue := false for _, v := range cityArr { if area == v.P_Name { isTrue = true break } } if isTrue { //完全匹配 return rdata } else { //未完全匹配 if len(cityArr) == 1 { rdata["area"] = cityArr[0].P_Name rdata["city"] = cityArr[0].C_Name rdata["district"] = "" return rdata } else { rdata["city"] = "" } } } } //第三步:省份校验 if ProvinceDict[area] == nil { rdata["area"] = "全国" rdata["city"] = "" rdata["district"] = "" } return rdata } var cityEndReg = regexp.MustCompile("(区|县|市)$") //拆分三级县 func aliasDataDistrict(district string) []string { arr := []string{} if cityEndReg.MatchString(district) { str := cityEndReg.FindString(district) strings.TrimRight(district, str) if str == "县" { arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str))) arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str))) } else if str == "区" { arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str))) arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str))) } else if str == "市" { arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str))) arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str))) } else { } } else { //未找到 district- 区县市 例: district : 金水 arr = append(arr, fmt.Sprintf("%s区", district)) arr = append(arr, fmt.Sprintf("%s县", district)) arr = append(arr, fmt.Sprintf("%s市", district)) } return arr }