package main import ( "encoding/json" "fmt" "log" "mongodb" "qfw/util" "regexp" "strings" "sync" "time" ) /** 任务入口 全量、增量合并 更新、插入,内存清理 转换成info对象 **/ var PreRegexp = map[string][]*regexp.Regexp{} var BackRegexp = map[string][]*regexp.Regexp{} var BackRepRegexp = map[string][]RegexpInfo{} var BlackRegexp = map[string][]*regexp.Regexp{} var ( //从标题获取项目编号 titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)") titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]") titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$") //对标题、项目名称等中英文符号、空格等进行处理 filterReg = regexp.MustCompile("[`~!@#$^&*()=|{}':;,\\[\\].<>/?!¥…()—【】‘;:”“。,、?%+_-]") //项目编号过滤 pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$") //项目编号只是数字或只是字母4个以下 StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$") //纯数字或纯字母 StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$") //含分包词,招标未识别分包 合并到一个项目 KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}") ) type RegexpInfo struct { regs *regexp.Regexp repstr string } func NewPT() *ProjectTask { p := &ProjectTask{ InitMinTime: int64(1325347200), name: "全/增量对象", thread: Thread, updatePool: make(chan []map[string]interface{}, 5000), //savePool: make(chan map[string]interface{}, 2000), wg: sync.WaitGroup{}, AllIdsMap: make(map[string]*ID, 5000000), mapPb: make(map[string]*Key, 1500000), mapPn: make(map[string]*Key, 5000000), mapPc: make(map[string]*Key, 5000000), saveSize: 100, //saveSign: make(chan bool, 1), //updateSign: make(chan bool, 1), coll: ProjectColl, validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400), statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400), jgTime: int64(util.IntAllDef(7, 7) * 86400), } return p } var P_QL *ProjectTask var sp = make(chan bool, 5) func (p *ProjectTask) updateAllQueue() { arru := make([][]map[string]interface{}, p.saveSize) indexu := 0 for { select { case v := <-p.updatePool: arru[indexu] = v indexu++ if indexu == p.saveSize { sp <- true go func(arru [][]map[string]interface{}) { defer func() { <-sp }() MongoTool.UpSertBulk(p.coll, arru...) }(arru) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { sp <- true go func(arru [][]map[string]interface{}) { defer func() { <-sp }() MongoTool.UpSertBulk(p.coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } } } } //全量合并 func (p *ProjectTask) taskQl() { defer util.Catch() p.thread = util.IntAllDef(Thread, 4) p.enter(MongoTool.DbName, ExtractColl, nil) } //增量合并 func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) { defer util.Catch() //1、检查pubilshtime索引 db, _ := udpInfo["db"].(string) if db == "" { db = MongoTool.DbName } coll, _ := udpInfo["coll"].(string) if coll == "" { coll = ExtractColl } thread := util.IntAllDef(Thread, 4) if thread > 0 { p.thread = thread } //开始id和结束id q, _ := udpInfo["query"].(map[string]interface{}) gtid := udpInfo["gtid"].(string) lteid := udpInfo["lteid"].(string) q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } p.enter(db, coll, q) if udpInfo["stop"] == nil { for i := 0; i < 5; i++ { sp <- true } for i := 0; i < 5; i++ { <-sp } log.Println("保存完成", p.pici) } } func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) { defer util.Catch() defer func() { p.Brun = false }() p.Brun = true count := 0 //q = map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 0}} util.Debug("start project", q, p.pici) sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0} ms := sess.DB(db).C(coll).Find(nil).Select(fields).Sort("publishtime") query := ms.Iter() var lastid interface{} for tmp := make(map[string]interface{}); query.Next(&tmp); count++ { if count%2000 == 0 { util.Debug("current ---", count, lastid) } if util.ObjToString(tmp["useable"]) == "0" { continue } lastid = tmp["_id"] info := ParseInfo(tmp) p.currentTime = info.Publishtime p.startProjectMerge(info, tmp) } log.Println("over...", count) } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } thisinfo.Bidamount = util.Float64All(tmp["bidamount"]) thisinfo.Budget = util.Float64All(tmp["budget"]) if tmp["pt_modify"] != nil { thisinfo.Publishtime = util.Int64All(tmp["pt_modify"]) tmp["publishtime"] = tmp["pt_modify"] } // 处理publishtime为空 if thisinfo.Publishtime <= 0 { for _, d := range DateTimeSelect { if tmp[d] != nil { thisinfo.Publishtime = util.Int64All(tmp[d]) tmp["publishtime"] = tmp[d] } } } if len(thisinfo.Topscopeclass) == 0 { thisinfo.Topscopeclass = []string{} } if len(thisinfo.Subscopeclass) == 0 { thisinfo.Subscopeclass = []string{} } if thisinfo.SubType == "" { thisinfo.SubType = util.ObjToString(tmp["bidstatus"]) } // 项目名称、项目标题过滤 thisinfo.ProjectName = filterReg.ReplaceAllString(thisinfo.ProjectName, "") thisinfo.ProjectCode = filterReg.ReplaceAllString(thisinfo.ProjectCode, "") //从标题中查找项目编号 res := titleGetPc.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } else { res = titleGetPc1.FindStringSubmatch(thisinfo.Title) if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) { thisinfo.PTC = res[3] } else { res = titleGetPc2.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } } } if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 { //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "") //if thisinfo.ProjectName != "" { thisinfo.pnbval++ //} } if thisinfo.ProjectCode != "" || thisinfo.PTC != "" { if thisinfo.ProjectCode != "" { //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 { thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "") } } else { //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 { thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "") } } if thisinfo.ProjectCode != "" || thisinfo.PTC != "" { thisinfo.pnbval++ } } if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 { thisinfo.PTC = "" } if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 { thisinfo.pnbval++ } else { thisinfo.Buyer = "" } //清理评审专家名单 if len(thisinfo.ReviewExperts) > 0 { thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts) } thisinfo.LenPC = len([]rune(thisinfo.ProjectCode)) thisinfo.LenPTC = len([]rune(thisinfo.PTC)) thisinfo.LenPN = len([]rune(thisinfo.ProjectName)) //winner := util.ObjToString(tmp["winner"]) //winners := []string{} //m1 := map[string]bool{} //if winner != "" { // m1[winner] = true // winners = append(winners, winner) //} //packageM, _ := tmp["package"].(map[string]interface{}) //if packageM != nil { // thisinfo.HasPackage = true // for _, p := range packageM { // pm, _ := p.(map[string]interface{}) // if pw := util.ObjToString(pm["winner"]); pw != "" && !m1[pw] { // m1[pw] = true // winners = append(winners, pw) // } // } //} //winners整理 if util.ObjToString(tmp["s_winner"]) != "" { thisinfo.Winners = strings.Split(util.ObjToString(tmp["s_winner"]), ",") } //thisinfo.Winners = winners //处理分包中数据异常问题 for k, tmp := range thisinfo.Package { if ps, ok := tmp.([]map[string]interface{}); ok { for i, p := range ps { name, _ := p["name"].(string) if len([]rune(name)) > 100 { p["name"] = fmt.Sprint([]rune(name[:100])) } ps[i] = p } thisinfo.Package[k] = ps } } return thisinfo } //从数组中删除元素 func deleteSlice(arr []string, v, stype string) []string { for k, v1 := range arr { if v1 == v { ts := time.Now().Unix() arr = append(arr[:k], arr[k+1:]...) rt := time.Now().Unix() - ts if rt > 0 { log.Println("deleteSlice", stype, rt, v, len(arr)) } return arr } } return arr } //校验评审专家 func ClearRp(tmp []string) []string { arrTmp := []string{} for _, v := range tmp { // 汉字过滤(全汉字,2-4个字) if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok { continue } arrTmp = append(arrTmp, v) } return arrTmp }