123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/goinggo/mapstructure"
- "github.com/robfig/cron"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "project/config"
- "regexp"
- "strings"
- "sync"
- "time"
- "unicode/utf8"
- )
- /**
- 任务入口
- 全量、增量合并
- 更新、插入,内存清理
- 转换成info对象
- **/
- // var PreRegexp = map[string][]*regexp.Regexp{}
- // var BackRegexp = map[string][]*regexp.Regexp{}
- // var BackRepRegexp = map[string][]RegexpInfo{}
- var BlackRegexp = map[string][]*regexp.Regexp{}
- var (
- //从标题获取项目编号
- titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
- titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
- titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
- //项目编号过滤
- pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}– ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
- //项目编号只是数字或只是字母4个以下
- StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
- //纯数字或纯字母
- StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
- //含分包词,招标未识别分包 合并到一个项目
- KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
- )
- type RegexpInfo struct {
- regs *regexp.Regexp
- repstr string
- }
- // 项目合并对象
- type ProjectTask struct {
- InitMinTime int64 //最小时间,小于0的处理一次
- name string
- thread int //线程数
- //查找锁
- findLock sync.Mutex
- wg sync.WaitGroup
- //map锁
- AllIdsMapLock sync.Mutex
- //对应的id
- AllIdsMap map[string]*ID
- //采购单位、项目名称、项目编号
- mapPb, mapPn, mapPc map[string]*Key
- //流程数据 字段相同,直接合并
- mapHref map[string]string
- mapHrefLock sync.Mutex
- //站点
- mapSite map[string]*Site
- mapSiteLock sync.Mutex
- //spider isflow
- mapSpider map[string]int
- mapSpiderLock sync.Mutex
- //bidtype、bidstatus 锁
- mapBidLock sync.Mutex
- //更新或新增通道
- updatePool chan []map[string]interface{}
- //savePool chan map[string]interface{}
- //saveSign, updateSign chan bool
- //表名
- coll string
- //当前状态是全量还是增量
- currentType string //当前是跑全量还是跑增量
- //
- clearContimes int
- //当前时间
- currentTime int64
- //保存长度
- saveSize int
- pici int64
- validTime int64
- statusTime int64
- //结果时间的更新 最近两天的公告不再更新jgtime
- jgTime int64
- Brun bool
- }
- func NewPT() *ProjectTask {
- p := &ProjectTask{
- InitMinTime: int64(1325347200),
- name: "全/增量对象",
- thread: Thread,
- updatePool: make(chan []map[string]interface{}, 5000),
- //savePool: make(chan map[string]interface{}, 2000),
- wg: sync.WaitGroup{},
- AllIdsMap: make(map[string]*ID, 5000000),
- mapPb: make(map[string]*Key, 1500000),
- mapPn: make(map[string]*Key, 5000000),
- mapPc: make(map[string]*Key, 5000000),
- mapHref: make(map[string]string, 1500000),
- mapSite: make(map[string]*Site, 1000000),
- mapSpider: make(map[string]int, 1000000),
- saveSize: 200,
- //saveSign: make(chan bool, 1),
- //updateSign: make(chan bool, 1),
- coll: ProjectColl,
- validTime: int64(util.IntAllDef(config.Conf.Serve.ValidDays, 150)) * 86400,
- statusTime: int64(util.IntAllDef(config.Conf.Serve.StatusDays, 15) * 86400),
- jgTime: int64(util.IntAllDef(7, 7) * 86400),
- }
- return p
- }
- var P_QL *ProjectTask
- var sp = make(chan bool, 1)
- func (p *ProjectTask) updateAllQueue() {
- arru := make([][]map[string]interface{}, p.saveSize)
- indexu := 0
- for {
- select {
- case v := <-p.updatePool:
- arru[indexu] = v
- indexu++
- if indexu == p.saveSize {
- sp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- MgoP.UpSertBulk(p.coll, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, p.saveSize)
- indexu = 0
- }
- case <-time.After(1 * time.Second):
- if indexu > 0 {
- sp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- MgoP.UpSertBulk(p.coll, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, p.saveSize)
- indexu = 0
- }
- }
- }
- }
- // 项目合并内存更新
- func (p *ProjectTask) clearMem() {
- c := cron.New()
- // 创建项目的时间大于7天
- //在内存中保留最近6个月的信息
- //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
- _ = c.AddFunc("50 0/5 * * * *", func() {
- if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
- SingleClear = 1
- //跳过的次数清零
- p.clearContimes = 0
- //信息进入查找对比全局锁
- p.findLock.Lock()
- //defer p.findLock.Unlock()
- //合并进行的任务都完成
- p.wg.Wait()
- //遍历id
- //所有内存中的项目信息
- p.AllIdsMapLock.Lock()
- p.mapHrefLock.Lock()
- log.Info("清除开始")
- //清除计数
- clearNum := 0
- for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
- v := p.AllIdsMap[pid]
- if v != nil && p.currentTime-v.P.LastTime > p.validTime {
- delete(p.mapHref, kHref)
- }
- }
- for k, v := range p.AllIdsMap {
- if p.currentTime-v.P.LastTime > p.validTime {
- clearNum++
- redis.Del(RedisProject, k)
- //删除id的map
- delete(p.AllIdsMap, k)
- //删除pb
- if v.P.Buyer != "" {
- ids := p.mapPb[v.P.Buyer]
- if ids != nil {
- ids.Lock.Lock()
- ids.Arr = deleteSlice(ids.Arr, k, "pb")
- if len(ids.Arr) == 0 {
- delete(p.mapPb, v.P.Buyer)
- }
- ids.Lock.Unlock()
- }
- }
- //删除mapPn
- for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
- if vn != "" {
- ids := p.mapPn[vn]
- if ids != nil {
- ids.Lock.Lock()
- ids.Arr = deleteSlice(ids.Arr, k, "pn")
- if len(ids.Arr) == 0 {
- delete(p.mapPn, vn)
- }
- ids.Lock.Unlock()
- }
- }
- }
- //删除mapPc
- for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
- if vn != "" {
- ids := p.mapPc[vn]
- if ids != nil {
- ids.Lock.Lock()
- ids.Arr = deleteSlice(ids.Arr, k, "pc")
- if len(ids.Arr) == 0 {
- delete(p.mapPc, vn)
- }
- ids.Lock.Unlock()
- }
- }
- }
- v = nil
- }
- }
- p.mapHrefLock.Unlock()
- p.AllIdsMapLock.Unlock()
- p.findLock.Unlock()
- SingleClear = 0
- log.Info("清除完成:", zap.Int("clearNum", clearNum), zap.Int("AllIdsMap:", len(p.AllIdsMap)), zap.Int("mapPn:", len(p.mapPn)),
- zap.Int("mapPc:", len(p.mapPc)), zap.Int("mapPb:", len(p.mapPb)), zap.Int("mapHref:", len(p.mapHref)))
- } else {
- p.clearContimes++
- }
- })
- c.Start()
- }
- // 全量合并
- func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
- defer util.Catch()
- p.thread = util.IntAllDef(Thread, 4)
- q, _ := udpInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{}
- lteid, _ := udpInfo["lteid"].(string)
- var idmap map[string]interface{}
- if len(lteid) > 15 {
- idmap = map[string]interface{}{
- "$lte": mongodb.StringTOBsonId(lteid),
- }
- }
- gtid, _ := udpInfo["gtid"].(string)
- if len(gtid) > 15 {
- if idmap == nil {
- idmap = map[string]interface{}{}
- }
- idmap["$gte"] = mongodb.StringTOBsonId(gtid)
- }
- if idmap != nil {
- q["_id"] = idmap
- }
- }
- if c := util.ObjToString(udpInfo["coll"]); c != "" {
- BiddingColl = c
- } else {
- BiddingColl = config.Conf.DB.MongoB.Coll
- }
- //生成查询语句执行
- p.enter(MgoB.DbName, BiddingColl, q)
- }
- // 增量合并
- func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
- defer util.Catch()
- //1、检查pubilshtime索引
- db, _ := udpInfo["db"].(string)
- if db == "" {
- db = MgoB.DbName
- }
- if c := util.ObjToString(udpInfo["coll"]); c != "" {
- BiddingColl = c
- } else {
- BiddingColl = config.Conf.DB.MongoB.Coll
- }
- thread := util.IntAllDef(Thread, 1)
- if thread > 0 {
- p.thread = thread
- }
- //开始id和结束id
- q, _ := udpInfo["query"].(map[string]interface{})
- gtid := udpInfo["gtid"].(string)
- lteid := udpInfo["lteid"].(string)
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(gtid),
- "$lte": mongodb.StringTOBsonId(lteid),
- },
- }
- p.enter(db, BiddingColl, q)
- if udpInfo["stop"] == nil {
- for i := 0; i < 1; i++ {
- sp <- true
- }
- for i := 0; i < 1; i++ {
- <-sp
- }
- }
- log.Info("保存完成,生索引", zap.Int64("pici:", p.pici))
- time.Sleep(5 * time.Second)
- nextNode(udpInfo, p.pici)
- }
- // 招标字段更新
- func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
- defer util.Catch()
- infoid := udpInfo["infoid"].(string)
- infoMap, _ := MgoB.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil)
- if (*infoMap)["modifyinfo"] == nil {
- log.Info("does not exist modifyinfo ---," + infoid)
- return
- }
- //client := Es.GetEsConn()
- //defer Es.DestoryEsConn(client)
- //esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}`
- //data := Es.Get(config.Conf.DB.Es.Index, esquery)
- data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid})
- if len(*data) > 0 {
- pid := mongodb.BsonIdToSId((*data)["_id"])
- p.updateJudge(*infoMap, pid)
- } else {
- log.Info("not find project---," + infoid)
- }
- }
- func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
- defer util.Catch()
- pid := util.ObjToString(udpInfo["pid"])
- updateMap := util.ObjToMap(udpInfo["updateField"])
- if pid == "" || len(*updateMap) == 0 {
- log.Error("参数有误")
- return
- }
- proMap, _ := MgoP.FindById(ProjectColl, pid, nil)
- if len(*proMap) > 1 {
- (*proMap)["reason"] = "直接修改项目字段信息"
- backupPro(*proMap)
- delete(*proMap, "reason")
- updataMap := make(map[string]interface{})
- modifyInfo := make(map[string]interface{})
- for k, v := range *updateMap {
- if strings.Contains(k, "time") {
- updataMap[k] = util.Int64All(v)
- } else {
- updataMap[k] = v
- }
- modifyInfo[k] = true
- }
- updataMap["modifyinfo"] = modifyInfo
- bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
- if bol {
- //es索引
- by, _ := json.Marshal(map[string]interface{}{
- "query": map[string]interface{}{
- "_id": bson.M{
- "$gte": pid,
- "$lte": pid,
- }},
- "stype": "project",
- })
- _ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
- }
- // 内存
- var pro ProjectCache
- err := mapstructure.Decode(proMap, &pro)
- if err != nil {
- log.Error(err.Error())
- }
- p.AllIdsMapLock.Lock()
- if v, ok := p.AllIdsMap[pid]; ok {
- v.P = &pro
- }
- p.AllIdsMapLock.Unlock()
- } else {
- log.Info("Not find project---" + pid)
- }
- }
- func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
- defer util.Catch()
- infoid := util.ObjToString(udpInfo["infoid"])
- if infoid == "" {
- return
- }
- //client := Es.GetEsConn()
- //defer Es.DestoryEsConn(client)
- //esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}`
- //data := Es.Get(config.Conf.DB.Es.Index, esquery)
- data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid})
- if len(*data) > 0 {
- pid := util.ObjToString((*data)["_id"])
- p.delJudge(infoid, pid)
- } else {
- log.Info("not find project---," + infoid)
- }
- }
- // 通知下个节点nextNode
- func nextNode(mapInfo map[string]interface{}, pici int64) {
- mapInfo["stype"] = "project"
- mapInfo["query"] = map[string]interface{}{
- "pici": pici,
- }
- key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
- mapInfo["key"] = key
- datas, _ := json.Marshal(mapInfo)
- node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- _ = udpclient.WriteUdp(datas, udp.OP_TYPE_DATA, toaddr[0])
- }
- func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
- defer util.Catch()
- defer func() {
- p.Brun = false
- }()
- p.Brun = true
- count := 0
- countRepeat := 0
- pool := make(chan bool, p.thread)
- log.Info("start project", zap.Any("q:", q), zap.String("coll:", coll), zap.Int64("pici", p.pici))
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- infoPool := make(chan map[string]interface{}, 2000)
- over := make(chan bool)
- go func() {
- L:
- for {
- select {
- case tmp := <-infoPool:
- pool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- }()
- p.fillInPlace(tmp)
- info := ParseInfo(tmp)
- p.currentTime = info.Publishtime
- //普通合并
- p.CommonMerge(tmp, info)
- }(tmp)
- default:
- select {
- case tmp := <-infoPool:
- pool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- }()
- p.fillInPlace(tmp)
- info := ParseInfo(tmp)
- p.currentTime = info.Publishtime
- //普通合并
- p.CommonMerge(tmp, info)
- }(tmp)
- case <-over:
- break L
- }
- }
- }
- }()
- //fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0}
- fields := map[string]interface{}{"detail": 0, "contenthtml": 0, "jsondata": 0, "regions_log": 0, "field_source": 0}
- if p.currentType == "project" || p.currentType == "project_history" {
- c, _ := sess.DB(db).C(coll).Find(q).Count()
- log.Info(fmt.Sprintf("共查询: %d条", c))
- }
- ms := sess.DB(db).C(coll).Find(q).Select(fields)
- query := ms.Iter()
- var lastid interface{}
- L:
- for {
- select {
- case <-queryClose:
- log.Info("receive interrupt sign")
- log.Info("close iter..", zap.Any("lastid:", lastid), zap.Any("err:", query.Cursor.Close(nil)))
- queryCloseOver <- true
- break L
- default:
- tmp := make(map[string]interface{})
- if query.Next(&tmp) {
- lastid = tmp["_id"]
- if P_QL.currentType == "ql" {
- if count%50000 == 0 {
- log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
- }
- } else {
- if count%2000 == 0 {
- log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
- }
- }
- // 判重过滤 中标记录
- if !siteJudge(util.ObjToString(tmp["spidercode"])) {
- //extracttype -1: 重复,1: 不重复
- if util.IntAll(tmp["extracttype"]) == 1 {
- if util.ObjToString(tmp["toptype"]) != "采购意向" && util.ObjToString(tmp["toptype"]) != "产权" && util.ObjToString(tmp["toptype"]) != "拟建" {
- if P_QL.currentType == "ql" {
- infoPool <- tmp
- } else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
- // id段增量数据
- infoPool <- tmp
- } else if P_QL.currentType == "project_history" && tmp["history_updatetime"] != nil {
- // id段 历史数据
- infoPool <- tmp
- }
- }
- } else {
- countRepeat++
- //if P_QL.currentType == "project" {
- // log.Info("repeat err---", tmp["_id"])
- //}
- }
- count++
- }
- } else {
- break L
- }
- }
- }
- time.Sleep(5 * time.Second)
- over <- true
- //阻塞
- for n := 0; n < p.thread; n++ {
- pool <- true
- }
- log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("countRepeat", countRepeat))
- }
- func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
- if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
- if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
- proHref := util.ObjToString(jsonData["projecthref"]) // 网站本身发布的公告具有招投标流程,直接参与合并
- if jsonData != nil && proHref != "" {
- //projectHref字段合并
- tmp["projecthref"] = proHref
- p.mapHrefLock.Lock()
- pid := p.mapHref[proHref]
- p.mapHrefLock.Unlock()
- if pid != "" {
- p.AllIdsMapLock.Lock()
- res := p.AllIdsMap[pid]
- p.AllIdsMapLock.Unlock()
- if res != nil {
- comparePro := res.P
- _, ex := p.CompareStatus(comparePro, info)
- p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
- } else {
- p.startProjectMerge(info, tmp)
- }
- } else {
- id, p1 := p.NewProject(tmp, info)
- p.mapHrefLock.Lock()
- p.mapHref[proHref] = id
- p.mapHrefLock.Unlock()
- p.AllIdsMapLock.Lock()
- p.AllIdsMap[id] = &ID{Id: id, P: p1}
- p.AllIdsMapLock.Unlock()
- }
- } else {
- //项目合并
- p.startProjectMerge(info, tmp)
- }
- } else {
- //项目合并
- p.startProjectMerge(info, tmp)
- }
- } else {
- }
- }
- func ParseInfo(tmp map[string]interface{}) (info *Info) {
- bys, _ := json.Marshal(tmp)
- var thisinfo *Info
- _ = json.Unmarshal(bys, &thisinfo)
- if thisinfo == nil {
- return nil
- }
- // 处理publishtime为空
- if thisinfo.Publishtime <= 0 {
- for _, d := range DateTimeSelect {
- if tmp[d] != nil {
- thisinfo.Publishtime = util.Int64All(tmp[d])
- tmp["publishtime"] = tmp[d]
- break
- }
- }
- }
- if len(thisinfo.Topscopeclass) == 0 {
- thisinfo.Topscopeclass = []string{}
- }
- if len(thisinfo.Subscopeclass) == 0 {
- thisinfo.Subscopeclass = []string{}
- }
- if thisinfo.SubType == "" {
- thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
- }
- //从标题中查找项目编号
- res := titleGetPc.FindStringSubmatch(thisinfo.Title)
- if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
- thisinfo.PTC = res[1]
- } else {
- res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
- if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
- thisinfo.PTC = res[3]
- } else {
- res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
- if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
- thisinfo.PTC = res[1]
- }
- }
- }
- if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
- //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
- //if thisinfo.ProjectName != "" {
- thisinfo.pnbval++
- //}
- }
- if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
- if thisinfo.ProjectCode != "" {
- //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
- if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
- thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
- }
- } else {
- //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
- if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
- thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
- }
- }
- thisinfo.pnbval++
- }
- if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
- thisinfo.PTC = ""
- }
- if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
- thisinfo.pnbval++
- } else {
- thisinfo.Buyer = ""
- }
- //清理评审专家名单
- if thisinfo.ReviewExperts != "" {
- thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
- }
- //winners整理、清理
- winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
- tmp["winner"] = winner
- m1 := map[string]bool{}
- winners := []string{}
- if winner != "" {
- m1[winner] = true
- winners = append(winners, winner)
- }
- packageM, _ := tmp["package"].(map[string]interface{})
- if packageM != nil {
- thisinfo.HasPackage = true
- for _, p := range packageM {
- pm, _ := p.(map[string]interface{})
- pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
- if pw != "" && !m1[pw] {
- m1[pw] = true
- winners = append(winners, pw)
- }
- }
- }
- thisinfo.Winners = winners
- //清理winnerorder
- var wins []map[string]interface{}
- for _, v := range thisinfo.WinnerOrder {
- w := QyFilter(util.ObjToString(v["entname"]), "winner")
- if w != "" {
- v["entname"] = w
- wins = append(wins, v)
- }
- }
- thisinfo.WinnerOrder = wins
- //清理buyer
- buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
- tmp["buyer"] = buyer
- thisinfo.Buyer = buyer
- thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
- thisinfo.LenPTC = len([]rune(thisinfo.PTC))
- thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
- //处理分包中数据异常问题
- for k, tmp := range thisinfo.Package {
- if ps, ok := tmp.([]map[string]interface{}); ok {
- for i, p := range ps {
- name, _ := p["name"].(string)
- if len([]rune(name)) > 100 {
- p["name"] = fmt.Sprint([]rune(name[:100]))
- }
- ps[i] = p
- }
- thisinfo.Package[k] = ps
- }
- }
- return thisinfo
- }
- func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
- tmpPro, _ := MgoP.FindById(ProjectColl, pid, nil)
- modifyProMap := make(map[string]interface{}) // 修改项目的字段
- if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok {
- for k := range modifyMap {
- if modifyMap[k] != nil && modifyMap[k] != "" && (*tmpPro)[k] != nil {
- modifyProMap[k] = infoMap[k]
- }
- }
- }
- if len(modifyProMap) == 0 {
- log.Info("修改招标公告信息不需要修改项目信息字段", zap.Any("id", infoMap["_id"]))
- return
- }
- p.AllIdsMapLock.Lock()
- _, ok := p.AllIdsMap[pid]
- p.AllIdsMapLock.Unlock()
- ids := (*tmpPro)["ids"].([]interface{})
- index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个 position list中位置
- for i, v := range ids {
- if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) {
- position = i
- if i == 0 {
- index = 0
- } else if i == len(ids)-1 {
- index = 2
- } else {
- index = 1
- }
- }
- }
- if ok {
- // 周期内
- //projecthref字段
- if infoMap["jsondata"] != nil {
- jsonData := infoMap["jsondata"].(map[string]interface{})
- if proHref, ok := jsonData["projecthref"].(string); ok {
- p.mapHrefLock.Lock()
- tempId := p.mapHref[proHref]
- p.mapHrefLock.Unlock()
- if pid == tempId {
- p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
- } else {
- log.Info("projecthref data id err---pid=" + pid + "---" + tempId)
- }
- } else {
- f := modifyEle(modifyProMap)
- if f {
- //合并、修改
- log.Info("合并修改更新" + "----------------------------")
- p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
- } else {
- //修改
- log.Info("修改更新" + "----------------------------")
- p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
- }
- }
- } else {
- f := modifyEle(modifyProMap)
- if f {
- //合并、修改
- log.Info("合并修改更新" + "----------------------------")
- p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
- } else {
- //修改
- log.Info("修改更新" + "----------------------------")
- p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
- }
- }
- } else {
- // 周期外
- log.Info("周期外数据直接修改" + "----------------------------")
- p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
- }
- }
- var Elements = []string{
- "projectname",
- "projectcode",
- "buyer",
- "agency",
- "area",
- "city",
- "publishtime",
- "toptype",
- "subtype",
- }
- /*
- *
- 修改的字段
- 修改的字段是否是影响合并流程的要素字段
- */
- func modifyEle(tmp map[string]interface{}) bool {
- merge := false
- for _, str := range Elements {
- if tmp[str] != nil {
- merge = true
- break
- }
- }
- return merge
- }
- // 补全位置信息
- func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
- area := util.ObjToString(tmp["area"])
- city := util.ObjToString(tmp["city"])
- if area != "" && city != "" {
- return
- }
- tmpSite := util.ObjToString(tmp["site"])
- if tmpSite == "" {
- return
- }
- p.mapSiteLock.Lock()
- defer p.mapSiteLock.Unlock()
- site := p.mapSite[tmpSite]
- if site != nil {
- if area != "" {
- if area == "全国" {
- tmp["area"] = site.Area
- tmp["city"] = site.City
- tmp["district"] = site.District
- return
- }
- if area != site.Area {
- return
- } else {
- if site.City != "" {
- tmp["area"] = site.Area
- tmp["city"] = site.City
- tmp["district"] = site.District
- }
- }
- } else {
- tmp["area"] = site.Area
- tmp["city"] = site.City
- tmp["district"] = site.District
- return
- }
- }
- }
- // 从数组中删除元素
- func deleteSlice(arr []string, v, stype string) []string {
- for k, v1 := range arr {
- if v1 == v {
- arr = append(arr[:k], arr[k+1:]...)
- return arr
- }
- }
- return arr
- }
- // 校验评审专家
- func ClearRp(tmp string) string {
- arrTmp := []string{}
- for _, v := range strings.Split(tmp, ",") {
- // 汉字过滤(全汉字,2-4个字)
- if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
- continue
- }
- //黑名单过滤
- if BlaskListMap[v] {
- continue
- }
- arrTmp = append(arrTmp, v)
- }
- return strings.Join(arrTmp, ",")
- }
- func QyFilter(name, stype string) string {
- name = strings.ReplaceAll(name, " ", "")
- //preReg := PreRegexp[stype]
- //for _, v := range preReg {
- // name = v.ReplaceAllString(name, "")
- //}
- //backReg := BackRegexp[stype]
- //for _, v := range backReg {
- // name = v.ReplaceAllString(name, "")
- //}
- //backRepReg := BackRepRegexp[stype]
- //for _, v := range backRepReg {
- // name = v.regs.ReplaceAllString(name, v.repstr)
- //}
- blackReg := BlackRegexp[stype]
- for _, v := range blackReg {
- if v.MatchString(name) {
- name = ""
- break
- }
- }
- if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
- name = ""
- }
- if utf8.RuneCountInString(name) > 60 {
- name = ""
- }
- return name
- }
- // @Description 过滤中标记录数据
- // @Author J 2023/6/12 15:11
- func siteJudge(code string) bool {
- for _, s := range SkipSiteList {
- if code == s {
- return true
- }
- }
- return false
- }
|