package main import ( "encoding/json" "fmt" "project/config" "regexp" "strings" "sync" "time" "unicode/utf8" "github.com/goinggo/mapstructure" "github.com/robfig/cron" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" ) /** 任务入口 全量、增量合并 更新、插入,内存清理 转换成info对象 **/ // var PreRegexp = map[string][]*regexp.Regexp{} // var BackRegexp = map[string][]*regexp.Regexp{} // var BackRepRegexp = map[string][]RegexpInfo{} var BlackRegexp = map[string][]*regexp.Regexp{} var ( //从标题获取项目编号 titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)") titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]") titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$") //项目编号过滤 pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$") //项目编号只是数字或只是字母4个以下 StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$") //纯数字或纯字母 StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$") //含分包词,招标未识别分包 合并到一个项目 KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}") ) type RegexpInfo struct { regs *regexp.Regexp repstr string } // 项目合并对象 type ProjectTask struct { InitMinTime int64 //最小时间,小于0的处理一次 name string thread int //线程数 //查找锁 findLock sync.Mutex wg sync.WaitGroup //map锁 AllIdsMapLock sync.Mutex //对应的id AllIdsMap map[string]*ID //采购单位、项目名称、项目编号 mapPb, mapPn, mapPc map[string]*Key //流程数据 字段相同,直接合并 mapHref map[string]string mapHrefLock sync.Mutex //站点 mapSite map[string]*Site mapSiteLock sync.Mutex //spider isflow mapSpider map[string]int mapSpiderLock sync.Mutex //bidtype、bidstatus 锁 mapBidLock sync.Mutex //更新或新增通道 updatePool chan []map[string]interface{} //savePool chan map[string]interface{} //saveSign, updateSign chan bool //表名 coll string //当前状态是全量还是增量 currentType string //当前是跑全量还是跑增量 // clearContimes int //当前时间 currentTime int64 //保存长度 saveSize int pici int64 validTime int64 statusTime int64 //结果时间的更新 最近两天的公告不再更新jgtime jgTime int64 Brun bool } func NewPT() *ProjectTask { p := &ProjectTask{ InitMinTime: int64(1325347200), name: "全/增量对象", thread: Thread, updatePool: make(chan []map[string]interface{}, 5000), //savePool: make(chan map[string]interface{}, 2000), wg: sync.WaitGroup{}, AllIdsMap: make(map[string]*ID, 5000000), mapPb: make(map[string]*Key, 1500000), mapPn: make(map[string]*Key, 5000000), mapPc: make(map[string]*Key, 5000000), mapHref: make(map[string]string, 1500000), mapSite: make(map[string]*Site, 1000000), mapSpider: make(map[string]int, 1000000), saveSize: 200, //saveSign: make(chan bool, 1), //updateSign: make(chan bool, 1), coll: ProjectColl, validTime: int64(util.IntAllDef(config.Conf.Serve.ValidDays, 150)) * 86400, statusTime: int64(util.IntAllDef(config.Conf.Serve.StatusDays, 15) * 86400), jgTime: int64(util.IntAllDef(7, 7) * 86400), currentType: "ql", } return p } var P_QL *ProjectTask var sp = make(chan bool, 1) func (p *ProjectTask) updateAllQueue() { arru := make([][]map[string]interface{}, p.saveSize) indexu := 0 for { select { case v := <-p.updatePool: arru[indexu] = v indexu++ if indexu == p.saveSize { sp <- true go func(arru [][]map[string]interface{}) { defer func() { <-sp }() MgoP.UpSertBulk(p.coll, arru...) }(arru) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { sp <- true go func(arru [][]map[string]interface{}) { defer func() { <-sp }() MgoP.UpSertBulk(p.coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } } } } // 项目合并内存更新 func (p *ProjectTask) clearMem() { c := cron.New() // 创建项目的时间大于7天 //在内存中保留最近6个月的信息 //跑全量时每5分钟跑一次,跑增量时400分钟跑一次 _ = c.AddFunc("0 */30 * * * ?", func() { if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 20 { SingleClear = 1 //跳过的次数清零 p.clearContimes = 0 //信息进入查找对比全局锁 p.findLock.Lock() //defer p.findLock.Unlock() //合并进行的任务都完成 p.wg.Wait() //遍历id //所有内存中的项目信息 p.AllIdsMapLock.Lock() p.mapHrefLock.Lock() log.Info("清除开始") //清除计数 clearNum := 0 for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行 v := p.AllIdsMap[pid] if v != nil && p.currentTime-v.P.LastTime > p.validTime { delete(p.mapHref, kHref) } } for k, v := range p.AllIdsMap { if p.currentTime-v.P.LastTime > p.validTime { clearNum++ redis.Del(RedisProject, k) //删除id的map delete(p.AllIdsMap, k) //删除pb if v.P.Buyer != "" { ids := p.mapPb[v.P.Buyer] if ids != nil { ids.Lock.Lock() ids.Arr = deleteSlice(ids.Arr, k, "pb") if len(ids.Arr) == 0 { delete(p.mapPb, v.P.Buyer) } ids.Lock.Unlock() } } //删除mapPn for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) { if vn != "" { ids := p.mapPn[vn] if ids != nil { ids.Lock.Lock() ids.Arr = deleteSlice(ids.Arr, k, "pn") if len(ids.Arr) == 0 { delete(p.mapPn, vn) } ids.Lock.Unlock() } } } //删除mapPc for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) { if vn != "" { ids := p.mapPc[vn] if ids != nil { ids.Lock.Lock() ids.Arr = deleteSlice(ids.Arr, k, "pc") if len(ids.Arr) == 0 { delete(p.mapPc, vn) } ids.Lock.Unlock() } } } v = nil } } p.mapHrefLock.Unlock() p.AllIdsMapLock.Unlock() p.findLock.Unlock() SingleClear = 0 log.Info("清除完成:", zap.Int("clearNum", clearNum), zap.Int("AllIdsMap:", len(p.AllIdsMap)), zap.Int("mapPn:", len(p.mapPn)), zap.Int("mapPc:", len(p.mapPc)), zap.Int("mapPb:", len(p.mapPb)), zap.Int("mapHref:", len(p.mapHref))) } else { p.clearContimes++ } }) c.Start() } // 全量合并 func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) { defer util.Catch() p.thread = util.IntAllDef(Thread, 4) q, _ := udpInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{} lteid, _ := udpInfo["lteid"].(string) var idmap map[string]interface{} if len(lteid) > 15 { idmap = map[string]interface{}{ "$lte": mongodb.StringTOBsonId(lteid), } } gtid, _ := udpInfo["gtid"].(string) if len(gtid) > 15 { if idmap == nil { idmap = map[string]interface{}{} } idmap["$gte"] = mongodb.StringTOBsonId(gtid) } if idmap != nil { q["_id"] = idmap } } if c := util.ObjToString(udpInfo["coll"]); c != "" { BiddingColl = c } else { BiddingColl = config.Conf.DB.MongoB.Coll } //生成查询语句执行 p.enter(MgoB.DbName, BiddingColl, q) } // 增量合并 func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) { defer util.Catch() //1、检查pubilshtime索引 db, _ := udpInfo["db"].(string) if db == "" { db = MgoB.DbName } if c := util.ObjToString(udpInfo["coll"]); c != "" { BiddingColl = c } else { BiddingColl = config.Conf.DB.MongoB.Coll } thread := util.IntAllDef(Thread, 1) if thread > 0 { p.thread = thread } //开始id和结束id q, _ := udpInfo["query"].(map[string]interface{}) gtid := udpInfo["gtid"].(string) lteid := udpInfo["lteid"].(string) q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } p.enter(db, BiddingColl, q) if udpInfo["stop"] == nil { for i := 0; i < 1; i++ { sp <- true } for i := 0; i < 1; i++ { <-sp } } log.Info("保存完成,生索引", zap.Int64("pici:", p.pici)) time.Sleep(5 * time.Second) nextNode(udpInfo, p.pici) } // 招标字段更新 func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) { defer util.Catch() infoid := udpInfo["infoid"].(string) infoMap, _ := MgoB.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil) if (*infoMap)["modifyinfo"] == nil { log.Info("does not exist modifyinfo ---," + infoid) return } //client := Es.GetEsConn() //defer Es.DestoryEsConn(client) //esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}` //data := Es.Get(config.Conf.DB.Es.Index, esquery) data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid}) if len(*data) > 0 { pid := mongodb.BsonIdToSId((*data)["_id"]) p.updateJudge(*infoMap, pid) } else { log.Info("not find project---," + infoid) } } func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) { defer util.Catch() pid := util.ObjToString(udpInfo["pid"]) updateMap := util.ObjToMap(udpInfo["updateField"]) if pid == "" || len(*updateMap) == 0 { log.Error("参数有误") return } proMap, _ := MgoP.FindById(ProjectColl, pid, nil) if len(*proMap) > 1 { (*proMap)["reason"] = "直接修改项目字段信息" backupPro(*proMap) delete(*proMap, "reason") updataMap := make(map[string]interface{}) modifyInfo := make(map[string]interface{}) for k, v := range *updateMap { if strings.Contains(k, "time") { updataMap[k] = util.Int64All(v) } else { updataMap[k] = v } modifyInfo[k] = true } updataMap["modifyinfo"] = modifyInfo bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap}) if bol { //es索引 by, _ := json.Marshal(map[string]interface{}{ "query": map[string]interface{}{ "_id": bson.M{ "$gte": pid, "$lte": pid, }}, "stype": "project", }) _ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1]) } // 内存 var pro ProjectCache err := mapstructure.Decode(proMap, &pro) if err != nil { log.Error(err.Error()) } p.AllIdsMapLock.Lock() if v, ok := p.AllIdsMap[pid]; ok { v.P = &pro } p.AllIdsMapLock.Unlock() } else { log.Info("Not find project---" + pid) } } func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) { defer util.Catch() infoid := util.ObjToString(udpInfo["infoid"]) if infoid == "" { return } //client := Es.GetEsConn() //defer Es.DestoryEsConn(client) //esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}` //data := Es.Get(config.Conf.DB.Es.Index, esquery) data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid}) if len(*data) > 0 { pid := mongodb.BsonIdToSId((*data)["_id"]) p.delJudge(infoid, pid) } else { log.Info("not find project---," + infoid) } } // 通知下个节点nextNode func nextNode(mapInfo map[string]interface{}, pici int64) { mapInfo["stype"] = "project" mapInfo["query"] = map[string]interface{}{ "pici": pici, } key := fmt.Sprintf("%d-%s-%d", pici, "project", 0) mapInfo["key"] = key datas, _ := json.Marshal(mapInfo) node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0} udptaskmap.Store(key, node) _ = udpclient.WriteUdp(datas, udp.OP_TYPE_DATA, toaddr[0]) } func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) { defer util.Catch() defer func() { p.Brun = false }() p.Brun = true count := 0 countRepeat := 0 pool := make(chan bool, p.thread) log.Info("start project", zap.Any("q:", q), zap.String("coll:", coll), zap.Int64("pici", p.pici)) sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) infoPool := make(chan map[string]interface{}, 2000) over := make(chan bool) go func() { L: for { select { case tmp := <-infoPool: pool <- true go func(tmp map[string]interface{}) { defer func() { <-pool }() p.fillInPlace(tmp) info := ParseInfo(tmp) p.currentTime = info.Publishtime //普通合并 p.CommonMerge(tmp, info) }(tmp) default: select { case tmp := <-infoPool: pool <- true go func(tmp map[string]interface{}) { defer func() { <-pool }() p.fillInPlace(tmp) info := ParseInfo(tmp) p.currentTime = info.Publishtime //普通合并 p.CommonMerge(tmp, info) }(tmp) case <-over: break L } } } }() //fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0} fields := map[string]interface{}{"detail": 0, "contenthtml": 0, "jsondata": 0, "regions_log": 0, "field_source": 0} if p.currentType == "project" || p.currentType == "project_history" { c, _ := sess.DB(db).C(coll).Find(q).Count() log.Info(fmt.Sprintf("共查询: %d条", c)) } ms := sess.DB(db).C(coll).Find(q).Select(fields) query := ms.Iter() var lastid interface{} L: for { select { case <-queryClose: log.Info("receive interrupt sign") log.Info("close iter..", zap.Any("lastid:", lastid), zap.Any("err:", query.Cursor.Close(nil))) queryCloseOver <- true break L default: tmp := make(map[string]interface{}) if query.Next(&tmp) { lastid = tmp["_id"] if P_QL.currentType == "ql" { if count%50000 == 0 { log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid))) } } else { if count%2000 == 0 { log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid))) } } // 判重过滤 中标记录 if !siteJudge(util.ObjToString(tmp["spidercode"])) { //extracttype -1: 重复,1: 不重复 if util.IntAll(tmp["extracttype"]) == 1 { if util.ObjToString(tmp["toptype"]) != "采购意向" && util.ObjToString(tmp["toptype"]) != "产权" && util.ObjToString(tmp["toptype"]) != "拟建" { if P_QL.currentType == "ql" { infoPool <- tmp } else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 { // id段增量数据 infoPool <- tmp } else if P_QL.currentType == "project_history" && tmp["history_updatetime"] != nil { // id段 历史数据 infoPool <- tmp } } } else { countRepeat++ //if P_QL.currentType == "project" { // log.Info("repeat err---", tmp["_id"]) //} } count++ } } else { break L } } } time.Sleep(5 * time.Second) over <- true //阻塞 for n := 0; n < p.thread; n++ { pool <- true } log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("countRepeat", countRepeat)) } func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) { if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) { if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok { proHref := util.ObjToString(jsonData["projecthref"]) // 网站本身发布的公告具有招投标流程,直接参与合并 if jsonData != nil && proHref != "" { //projectHref字段合并 tmp["projecthref"] = proHref p.mapHrefLock.Lock() pid := p.mapHref[proHref] p.mapHrefLock.Unlock() if pid != "" { p.AllIdsMapLock.Lock() res := p.AllIdsMap[pid] p.AllIdsMapLock.Unlock() if res != nil { comparePro := res.P _, ex := p.CompareStatus(comparePro, info) p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex) } else { p.startProjectMerge(info, tmp) } } else { id, p1 := p.NewProject(tmp, info) p.mapHrefLock.Lock() p.mapHref[proHref] = id p.mapHrefLock.Unlock() p.AllIdsMapLock.Lock() p.AllIdsMap[id] = &ID{Id: id, P: p1} p.AllIdsMapLock.Unlock() } } else { //项目合并 p.startProjectMerge(info, tmp) } } else { //项目合并 p.startProjectMerge(info, tmp) } } else { } } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } // 处理publishtime为空 if thisinfo.Publishtime <= 0 { for _, d := range DateTimeSelect { if tmp[d] != nil { thisinfo.Publishtime = util.Int64All(tmp[d]) tmp["publishtime"] = tmp[d] break } } } if len(thisinfo.Topscopeclass) == 0 { thisinfo.Topscopeclass = []string{} } if len(thisinfo.Subscopeclass) == 0 { thisinfo.Subscopeclass = []string{} } if thisinfo.SubType == "" { thisinfo.SubType = util.ObjToString(tmp["bidstatus"]) } //从标题中查找项目编号 res := titleGetPc.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } else { res = titleGetPc1.FindStringSubmatch(thisinfo.Title) if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) { thisinfo.PTC = res[3] } else { res = titleGetPc2.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } } } if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 { //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "") //if thisinfo.ProjectName != "" { thisinfo.pnbval++ //} } if thisinfo.ProjectCode != "" || thisinfo.PTC != "" { if thisinfo.ProjectCode != "" { //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 { thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "") } } else { //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 { thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "") } } thisinfo.pnbval++ } if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 { thisinfo.PTC = "" } if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 { thisinfo.pnbval++ } else { thisinfo.Buyer = "" } //清理评审专家名单 if thisinfo.ReviewExperts != "" { thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts) } //winners整理、清理 winner := QyFilter(util.ObjToString(tmp["winner"]), "winner") tmp["winner"] = winner m1 := map[string]bool{} winners := []string{} if winner != "" { m1[winner] = true winners = append(winners, winner) } packageM, _ := tmp["package"].(map[string]interface{}) if packageM != nil { thisinfo.HasPackage = true for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw := QyFilter(util.ObjToString(pm["winner"]), "winner") if pw != "" && !m1[pw] { m1[pw] = true winners = append(winners, pw) } } } thisinfo.Winners = winners //清理winnerorder var wins []map[string]interface{} for _, v := range thisinfo.WinnerOrder { w := QyFilter(util.ObjToString(v["entname"]), "winner") if w != "" { v["entname"] = w wins = append(wins, v) } } thisinfo.WinnerOrder = wins //清理buyer buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer") tmp["buyer"] = buyer thisinfo.Buyer = buyer thisinfo.LenPC = len([]rune(thisinfo.ProjectCode)) thisinfo.LenPTC = len([]rune(thisinfo.PTC)) thisinfo.LenPN = len([]rune(thisinfo.ProjectName)) //处理分包中数据异常问题 for k, tmp := range thisinfo.Package { if ps, ok := tmp.([]map[string]interface{}); ok { for i, p := range ps { name, _ := p["name"].(string) if len([]rune(name)) > 100 { p["name"] = fmt.Sprint([]rune(name[:100])) } ps[i] = p } thisinfo.Package[k] = ps } } return thisinfo } func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) { tmpPro, _ := MgoP.FindById(ProjectColl, pid, nil) modifyProMap := make(map[string]interface{}) // 修改项目的字段 if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok { for k := range modifyMap { if modifyMap[k] != nil && modifyMap[k] != "" && (*tmpPro)[k] != nil { modifyProMap[k] = infoMap[k] } } } if len(modifyProMap) == 0 { log.Info("修改招标公告信息不需要修改项目信息字段", zap.Any("id", infoMap["_id"])) return } p.AllIdsMapLock.Lock() _, ok := p.AllIdsMap[pid] p.AllIdsMapLock.Unlock() ids := (*tmpPro)["ids"].([]interface{}) index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个 position list中位置 for i, v := range ids { if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) { position = i if i == 0 { index = 0 } else if i == len(ids)-1 { index = 2 } else { index = 1 } } } if ok { // 周期内 //projecthref字段 if infoMap["jsondata"] != nil { jsonData := infoMap["jsondata"].(map[string]interface{}) if proHref, ok := jsonData["projecthref"].(string); ok { p.mapHrefLock.Lock() tempId := p.mapHref[proHref] p.mapHrefLock.Unlock() if pid == tempId { p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap) } else { log.Info("projecthref data id err---pid=" + pid + "---" + tempId) } } else { f := modifyEle(modifyProMap) if f { //合并、修改 log.Info("合并修改更新" + "----------------------------") p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap) } else { //修改 log.Info("修改更新" + "----------------------------") p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap) } } } else { f := modifyEle(modifyProMap) if f { //合并、修改 log.Info("合并修改更新" + "----------------------------") p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap) } else { //修改 log.Info("修改更新" + "----------------------------") p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap) } } } else { // 周期外 log.Info("周期外数据直接修改" + "----------------------------") p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap) } } var Elements = []string{ "projectname", "projectcode", "buyer", "agency", "area", "city", "publishtime", "toptype", "subtype", } /* * 修改的字段 修改的字段是否是影响合并流程的要素字段 */ func modifyEle(tmp map[string]interface{}) bool { merge := false for _, str := range Elements { if tmp[str] != nil { merge = true break } } return merge } // 补全位置信息 func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) { area := util.ObjToString(tmp["area"]) city := util.ObjToString(tmp["city"]) if area != "" && city != "" { return } tmpSite := util.ObjToString(tmp["site"]) if tmpSite == "" { return } p.mapSiteLock.Lock() defer p.mapSiteLock.Unlock() site := p.mapSite[tmpSite] if site != nil { if area != "" { if area == "全国" { tmp["area"] = site.Area tmp["city"] = site.City tmp["district"] = site.District return } if area != site.Area { return } else { if site.City != "" { tmp["area"] = site.Area tmp["city"] = site.City tmp["district"] = site.District } } } else { tmp["area"] = site.Area tmp["city"] = site.City tmp["district"] = site.District return } } } // 从数组中删除元素 func deleteSlice(arr []string, v, stype string) []string { for k, v1 := range arr { if v1 == v { arr = append(arr[:k], arr[k+1:]...) return arr } } return arr } // 校验评审专家 func ClearRp(tmp string) string { arrTmp := []string{} for _, v := range strings.Split(tmp, ",") { // 汉字过滤(全汉字,2-4个字) if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok { continue } //黑名单过滤 if BlaskListMap[v] { continue } arrTmp = append(arrTmp, v) } return strings.Join(arrTmp, ",") } func QyFilter(name, stype string) string { name = strings.ReplaceAll(name, " ", "") //preReg := PreRegexp[stype] //for _, v := range preReg { // name = v.ReplaceAllString(name, "") //} //backReg := BackRegexp[stype] //for _, v := range backReg { // name = v.ReplaceAllString(name, "") //} //backRepReg := BackRepRegexp[stype] //for _, v := range backRepReg { // name = v.regs.ReplaceAllString(name, v.repstr) //} blackReg := BlackRegexp[stype] for _, v := range blackReg { if v.MatchString(name) { name = "" break } } if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) { name = "" } if utf8.RuneCountInString(name) > 60 { name = "" } return name } // @Description 过滤中标记录数据 // @Author J 2023/6/12 15:11 func siteJudge(code string) bool { for _, s := range SkipSiteList { if code == s { return true } } return false }