123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- package main
- import (
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "regexp"
- "strings"
- "sync"
- util "utils"
- "utils/mongodb"
- )
- func (t *TaskInfo) biddingAllTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- var mpool = make(chan bool, t.thread)
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- idMap := q["_id"].(map[string]interface{})
- tmpQ := map[string]interface{}{}
- for c, id := range idMap {
- if idStr, ok := id.(string); ok && id != "" {
- tmpQ[c] = mongodb.StringTOBsonId(idStr)
- }
- }
- q["_id"] = tmpQ
- }
- //bidding库
- biddingConn := biddingMgo.GetMgoConn()
- defer biddingMgo.DestoryMongoConn(biddingConn)
- //extract库
- extractConn := extractMgo.GetMgoConn()
- defer extractMgo.DestoryMongoConn(extractConn)
- //连接信息
- c, _ := mapInfo["coll"].(string)
- if c == "" {
- c, _ = bidding["collect"].(string)
- } else {
- currentColl = c
- }
- extractc, _ := extract["collect"].(string)
- count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
- //线程池
- UpdatesLock := sync.Mutex{}
- util.Debug("查询语句:", q, "同步总数:", count)
- //查询招标数据
- query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- "publishdept": 0, // 6.30 迭代报错,字段值乱码
- }).Sort("_id").Iter()
- //查询抽取结果
- extractResult := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter()
- n := 0
- //对比两张表数据,减少查询次数
- var compare map[string]interface{}
- bnil := false
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- continue
- }
- update := map[string]interface{}{}
- del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
- //对比方法----------------
- for {
- if compare == nil {
- compare = make(map[string]interface{})
- if !extractResult.Next(compare) {
- break
- }
- }
- if compare != nil {
- //对比
- cid := mongodb.BsonIdToSId(compare["_id"])
- tid := mongodb.BsonIdToSId(tmp["_id"])
- if cid == tid {
- bnil = false
- //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新
- modifyinfo := make(map[string]bool)
- if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
- for k, _ := range tmpmodifyinfo {
- modifyinfo[k] = true
- }
- }
- for _, k := range biddingMgoFields { //fields更新到mongo的字段
- v1 := compare[k] //extract
- v2 := tmp[k] //bidding
- if v2 == nil && v1 != nil {
- update[k] = v1
- } else if v2 != nil && v1 != nil && !modifyinfo[k] {
- update[k] = v1
- } else if v2 != nil && v1 == nil && !modifyinfo[k] { //
- if k == "s_subscopeclass" && del["subscopeclass"] == nil {
- continue
- } else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
- continue
- }
- del[k] = 1
- //qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
- }
- }
- if util.IntAll(compare["repeat"]) == 1 {
- update["extracttype"] = -1
- } else {
- update["extracttype"] = 1
- }
- break
- } else {
- if cid < tid {
- bnil = false
- compare = nil
- continue
- } else {
- bnil = true
- break
- }
- }
- } else {
- bnil = false
- break
- }
- }
- //下面可以多线程跑的--->
- //处理分类
- mpool <- true
- _id := tmp["_id"]
- go func(tmp, update, compare, del map[string]interface{}, bnil bool) {
- defer func() {
- <-mpool
- }()
- if !bnil && compare != nil && len(compare) > 0 {
- FieldMethod(compare, update)
- compare = nil
- } else {
- area := util.ObjToString(tmp["area"])
- city := util.ObjToString(tmp["city"])
- district := util.ObjToString(tmp["district"])
- UpdatesLock.Lock()
- rdata := standardCheckCity(area, city, district)
- UpdatesLock.Unlock()
- if len(rdata) > 0 {
- for k, v := range rdata {
- update[k] = v
- }
- }
- }
- //------------------对比结束
- //同时保存到elastic
- for tk, tv := range update {
- tmp[tk] = tv
- }
- if tmp["s_winner"] != "" {
- cid := FieldFun(tmp)
- if len(cid) > 0 {
- tmp["entidlist"] = cid
- update["entidlist"] = cid
- tmp_up := []map[string]interface{}{}
- tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]})
- tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}})
- updateExtractPool <- tmp_up
- }
- }
- clearMap(tmp)
- //go IS.Add("bidding")
- UpdatesLock.Lock()
- if util.IntAll(update["extracttype"]) != -1 {
- newTmp := GetEsField(tmp, update, t.stype)
- saveEsPool <- newTmp
- }
- if len(update) > 0 {
- delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除
- if len(del) > 0 { //删除的数据
- updateBiddingPool <- []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": update, "$unset": del},
- }
- } else {
- updateBiddingPool <- []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": update},
- }
- }
- }
- UpdatesLock.Unlock()
- }(tmp, update, compare, del, bnil)
- if n%20000 == 0 {
- util.Debug("current:", n, _id)
- }
- tmp = make(map[string]interface{})
- }
- util.Debug(mapInfo, "create bidding index...over", n)
- }
- //城市标准校验
- func standardCheckCity(area string, city string, district string) map[string]string {
- rdata := make(map[string]string)
- if area == "香港" || area == "澳门" || area == "台湾" || (area == "全国" && (city == "" && district == "")) {
- return rdata
- }
- //第一步:区校验
- if district != "" {
- districtArr := DistrictDict[district]
- if districtArr == nil { //涉及了 个别别名相关的数据
- trim_arr := aliasDataDistrict(district) //拆分后缀
- if len(trim_arr) > 0 {
- for _, alias_district := range trim_arr {
- alias_districtArr := DistrictDict[alias_district]
- for _, v := range alias_districtArr {
- if city == v.C_Name && area == v.P_Name {
- rdata["district"] = alias_district
- return rdata
- }
- }
- }
- }
- rdata["district"] = ""
- } else {
- isTrue := false
- for _, v := range districtArr {
- if city == v.C_Name && area == v.P_Name {
- isTrue = true
- break
- }
- }
- if isTrue { //完全匹配
- return rdata
- } else { //未完全匹配
- if len(districtArr) == 1 {
- rdata["area"] = districtArr[0].P_Name
- rdata["city"] = districtArr[0].C_Name
- rdata["district"] = districtArr[0].D_Name
- return rdata
- } else {
- rdata["district"] = ""
- }
- }
- }
- }
- //第二步:区校验-失败 市-校验
- if city != "" {
- cityArr := CityDict[city]
- if cityArr == nil {
- //把市当成区,匹配三级 - 存在优化空间- city:郑州 别名
- districtArr := DistrictDict[city]
- for _, v := range districtArr {
- if city == v.C_Name && area == v.P_Name {
- rdata["area"] = districtArr[0].P_Name
- rdata["city"] = districtArr[0].C_Name
- rdata["district"] = districtArr[0].D_Name
- return rdata
- }
- }
- rdata["city"] = ""
- } else {
- isTrue := false
- for _, v := range cityArr {
- if area == v.P_Name {
- isTrue = true
- break
- }
- }
- if isTrue { //完全匹配
- return rdata
- } else { //未完全匹配
- if len(cityArr) == 1 {
- rdata["area"] = cityArr[0].P_Name
- rdata["city"] = cityArr[0].C_Name
- rdata["district"] = ""
- return rdata
- } else {
- rdata["city"] = ""
- }
- }
- }
- }
- //第三步:省份校验
- if ProvinceDict[area] == nil {
- rdata["area"] = "全国"
- rdata["city"] = ""
- rdata["district"] = ""
- }
- return rdata
- }
- var cityEndReg = regexp.MustCompile("(区|县|市)$")
- //拆分三级县
- func aliasDataDistrict(district string) []string {
- arr := []string{}
- if cityEndReg.MatchString(district) {
- str := cityEndReg.FindString(district)
- strings.TrimRight(district, str)
- if str == "县" {
- arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
- arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
- } else if str == "区" {
- arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
- arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
- } else if str == "市" {
- arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
- arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
- } else {
- }
- } else { //未找到 district- 区县市 例: district : 金水
- arr = append(arr, fmt.Sprintf("%s区", district))
- arr = append(arr, fmt.Sprintf("%s县", district))
- arr = append(arr, fmt.Sprintf("%s市", district))
- }
- return arr
- }
|