package main import ( "encoding/json" "log" mu "mfw/util" "net" "qfw/util" "regexp" "strings" "sync" "time" "github.com/robfig/cron" ) /** 任务入口 全量、增量合并 更新、插入,内存清理 转换成info对象 **/ //项目合并对象 type ProjectTask struct { InitMinTime int64 //最小时间,小于0的处理一次 name string thread int //线程数 //查找锁 findLock sync.Mutex wg sync.WaitGroup //map锁 AllIdsMapLock sync.Mutex //对应的id AllIdsMap map[string]*ID //采购单位、项目名称、项目编号 mapPb, mapPn, mapPc map[string]*Key // mapPbLock, mapPnLock, mapPcLock sync.Mutex //更新或新增通道 updatePool chan []map[string]interface{} savePool chan map[string]interface{} saveSign, updateSign chan bool //表名 coll string //当前状态是全量还是增量 currentType string //当前是跑全量还是跑增量 // clearContimes int //当前时间 currentTime int64 //保存长度 saveSize int pici int64 validTime int64 // LockPool chan *sync.Mutex // LockPoolLock sync.Mutex // m1, m23, m4 map[int]int // l1, l23, l4 map[int]*sync.Mutex } //func (p *ProjectTask) ConCurrentLock(n1, n2, n3, n4 int) { // var lock *sync.Mutex // p.LockPoolLock.Lock() // if p.m1[n1] > 0 || p.m23[n2] > 0 || p.m23[n3] > 0 || p.m4[n4] > 0 { // if p.l1[n1] != nil { // lock = p.l1[n1] // } else if p.l23[n2] != nil { // lock = p.l23[n2] // } else if p.l23[n3] != nil { // lock = p.l23[n3] // } else if p.l4[n4] != nil { // lock = p.l4[n4] // } // } else { // lock = <-p.LockPool // } // if n1 > 0 { // p.m1[n1]++ // p.l1[n1] = lock // } // if n2 > 0 { // p.m23[n2]++ // p.l23[n2] = lock // } // if n3 > 0 { // p.m23[n3]++ // p.l23[n3] = lock // } // if n4 > 0 { // p.m4[n4]++ // p.l4[n4] = lock // } // p.LockPoolLock.Unlock() // lock.Lock() //} //func (p *ProjectTask) ConCurrentUnLock(n1, n2, n3, n4 int) { // var lock1 *sync.Mutex // p.LockPoolLock.Lock() // if p.l1[n1] != nil { // lock1 = p.l1[n1] // } else if p.l23[n2] != nil { // lock1 = p.l23[n2] // } else if p.l23[n3] != nil { // lock1 = p.l23[n3] // } else if p.l4[n4] != nil { // lock1 = p.l4[n4] // } // if p.m1[n1] > 0 { // p.m1[n1]-- // if p.m1[n1] == 0 { // p.l1[n1] = nil // } // } // if p.m23[n2] > 0 { // p.m23[n2]-- // if p.m23[n2] == 0 { // p.l23[n2] = nil // } // } // if p.m23[n3] > 0 { // p.m23[n3]-- // if p.m23[n3] == 0 { // p.l23[n3] = nil // } // } // if p.m4[n4] > 0 { // p.m4[n4]-- // if p.m4[n4] == 0 { // p.l4[n4] = nil // } // } // p.LockPoolLock.Unlock() // lock1.Unlock() //} func NewPT() *ProjectTask { p := &ProjectTask{ InitMinTime: int64(1325347200), name: "全/增量对象", thread: 4, updatePool: make(chan []map[string]interface{}, 2000), savePool: make(chan map[string]interface{}, 2000), wg: sync.WaitGroup{}, AllIdsMap: make(map[string]*ID, 10000000), mapPb: make(map[string]*Key, 2000000), mapPn: make(map[string]*Key, 5000000), mapPc: make(map[string]*Key, 5000000), saveSize: 200, saveSign: make(chan bool, 1), updateSign: make(chan bool, 1), coll: ProjectColl, validTime: 180 * 86400, // LockPool: make(chan *sync.Mutex, 200), // m1: map[int]int{}, // m23: map[int]int{}, // m4: map[int]int{}, // l1: map[int]*sync.Mutex{}, l23: map[int]*sync.Mutex{}, l4: map[int]*sync.Mutex{}, } // for i := 0; i < 200; i++ { // p.LockPool <- &sync.Mutex{} // } // go func() { // for { // p.LockPool <- &sync.Mutex{} // } // }() return p } var P_QL *ProjectTask //初始化全量合并对象 func init() { P_QL = NewPT() go P_QL.updateAllQueue() // go P_QL.saveQueue() // go P_QL.updateQueue() go P_QL.clearMem() } func (p *ProjectTask) saveQueue() { arr := make([]map[string]interface{}, p.saveSize) indexs := 0 for { select { case <-p.saveSign: if indexs > 0 { MongoTool.SaveBulk(p.coll, arr[:indexs]...) arr = make([]map[string]interface{}, p.saveSize) indexs = 0 } p.updateSign <- true case v := <-p.savePool: arr[indexs] = v indexs++ if indexs == p.saveSize { MongoTool.SaveBulk(p.coll, arr...) arr = make([]map[string]interface{}, p.saveSize) indexs = 0 } case <-time.After(100 * time.Millisecond): if indexs > 0 { MongoTool.SaveBulk(p.coll, arr[:indexs]...) arr = make([]map[string]interface{}, p.saveSize) indexs = 0 } } } } //项目保存和更新通道 func (p *ProjectTask) updateQueue() { arru := make([][]map[string]interface{}, p.saveSize) indexu := 0 for { select { case v := <-p.updatePool: arru[indexu] = v indexu++ if indexu == p.saveSize { //更新之前先保存 p.saveSign <- true <-p.updateSign MongoTool.UpdateBulk(p.coll, arru...) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } case <-time.After(100 * time.Millisecond): if indexu > 0 { p.saveSign <- true <-p.updateSign MongoTool.UpdateBulk(p.coll, arru[:indexu]...) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } } } } func (p *ProjectTask) updateAllQueue() { arru := make([][]map[string]interface{}, p.saveSize) indexu := 0 sp := make(chan bool, 5) for { select { case v := <-p.updatePool: arru[indexu] = v indexu++ if indexu == p.saveSize { sp <- true go func(arru [][]map[string]interface{}) { defer func() { <-sp }() MongoTool.UpSertBulk(p.coll, arru...) }(arru) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } case <-time.After(500 * time.Millisecond): if indexu > 0 { sp <- true go func(arru [][]map[string]interface{}) { defer func() { <-sp }() MongoTool.UpSertBulk(p.coll, arru...) }(arru[:indexu]) //MongoTool.UpSertBulk(p.coll, arru[:indexu]...) arru = make([][]map[string]interface{}, p.saveSize) indexu = 0 } } } } //项目合并内存更新 func (p *ProjectTask) clearMem() { c := cron.New() //在内存中保留最近6个月的信息 //跑全量时每4分钟跑一次,跑增量时400分钟跑一次 c.AddFunc("50 0/15 * * * *", func() { if p.currentType == "ql" || p.clearContimes >= 60 { //跳过的次数清零 p.clearContimes = 0 //信息进入查找对比全局锁 p.findLock.Lock() //defer p.findLock.Unlock() //合并进行的任务都完成 p.wg.Wait() //遍历id //所有内存中的项目信息 p.AllIdsMapLock.Lock() //清除计数 clearNum := 0 for k, v := range p.AllIdsMap { if p.currentTime-v.P.LastTime > p.validTime { clearNum++ //删除id的map delete(p.AllIdsMap, k) //删除pb if v.P.Buyer != "" { ids := p.mapPb[v.P.Buyer] if ids != nil { ids.Lock.Lock() ids.Arr = deleteSlice(ids.Arr, k) if len(ids.Arr) == 0 { delete(p.mapPb, v.P.Buyer) } ids.Lock.Unlock() } } //删除mapPn for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) { if vn != "" { ids := p.mapPn[vn] if ids != nil { ids.Lock.Lock() ids.Arr = deleteSlice(ids.Arr, k) if len(ids.Arr) == 0 { delete(p.mapPn, vn) } ids.Lock.Unlock() } } } //删除mapPc for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) { if vn != "" { ids := p.mapPc[vn] if ids != nil { ids.Lock.Lock() ids.Arr = deleteSlice(ids.Arr, k) if len(ids.Arr) == 0 { delete(p.mapPc, vn) } ids.Lock.Unlock() } } } v = nil } } p.AllIdsMapLock.Unlock() p.findLock.Unlock() log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb)) } else { p.clearContimes++ } }) c.Start() select {} } //全量合并 func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) { defer util.Catch() //1、检查pubilshtime索引 db, _ := udpInfo["db"].(string) if db == "" { db = MongoTool.DbName } coll, _ := udpInfo["coll"].(string) if coll == "" { coll = ExtractColl } // sess := MongoTool.GetMgoConn() // bcon := false // if sess.DB(db).C(coll).EnsureIndexKey("publishtime_1", "publishtime_-1") == nil { // bcon = true // } else { // log.Println("publishtime_1索引不存在") // } // MongoTool.DestoryMongoConn(sess) thread := util.IntAllDef(udpInfo["thread"], 4) if thread > 0 { p.thread = thread } q, _ := udpInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{} lteid, _ := udpInfo["lteid"].(string) var idmap map[string]interface{} if len(lteid) > 15 { idmap = map[string]interface{}{ "$lte": util.StringTOBsonId(lteid), } } gtid, _ := udpInfo["gtid"].(string) if len(gtid) > 15 { if idmap == nil { idmap = map[string]interface{}{} } idmap["$gt"] = util.StringTOBsonId(gtid) } if idmap != nil { q["_id"] = idmap } } //生成查询语句执行 log.Println("查询语句:", q) p.enter(db, coll, q) } //增量合并 func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) { defer util.Catch() //1、检查pubilshtime索引 db, _ := udpInfo["db"].(string) if db == "" { db = MongoTool.DbName } coll, _ := udpInfo["coll"].(string) if coll == "" { coll = ExtractColl } thread := util.IntAllDef(udpInfo["thread"], 3) if thread > 0 { p.thread = thread } //开始id和结束id q, _ := udpInfo["query"].(map[string]interface{}) gtid := udpInfo["gtid"].(string) lteid := udpInfo["lteid"].(string) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(gtid), //util.StringTOBsonId(udpInfo["gtid"].(string)), "$lte": util.StringTOBsonId(lteid), //util.StringTOBsonId(udpInfo["lteid"].(string)), }, } } if q != nil { //生成查询语句执行 p.enter(db, coll, q) } nextNode(gtid, lteid, "project", p.pici) } //通知下个节点nextNode func nextNode(gtid, lteid, stype string, pici int64) { by, _ := json.Marshal(map[string]interface{}{ "gtid": gtid, "lteid": lteid, "stype": stype, "query": map[string]interface{}{ "pici": pici, }, }) log.Println("nextnode", string(by)) for _, v := range NextNode { if node, ok := v.(map[string]interface{}); ok { udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(node["addr"].(string)), Port: util.IntAll(node["port"]), }) } } } func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) { defer util.Catch() count, taskcount := 0, 0 //var lastid interface{} // for { // if lastid != nil { // if q == nil { // q = map[string]interface{}{} // } // if q["_id"] == nil { // q["_id"] = map[string]interface{}{} // } // q["_id"].(map[string]interface{})["$gt"] = lastid // } pool := make(chan bool, p.thread) log.Println("start project", q) sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter() // .Select(map[string]interface{}{ // "blocks": 0, // "fieldall": 0, // }).Iter() //over := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if util.IntAll(tmp["repeat"]) == 0 { info := ParseInfo(tmp) if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) { pool <- true taskcount++ go func(info *Info, tmp map[string]interface{}) { defer func() { p.currentTime = info.Publishtime <-pool }() p.startProjectMerge(info, tmp) }(info, tmp) } else { //信息错误,进行更新 } } if count%1000 == 0 { log.Println("current", count) } // if taskcount > 0 && taskcount%50000 == 0 { //歇歇 // log.Println("pause start..", taskcount) // for n := 0; n < p.thread; n++ { // pool <- true // } // for n := 0; n < p.thread; n++ { // <-pool // } // log.Println("pause over..") // } //lastid = tmp["_id"] tmp = make(map[string]interface{}) // if count > 40000 { // query.Close() // break // } //over++ } //阻塞 for n := 0; n < p.thread; n++ { pool <- true } // if over == 0 { // break // } //} log.Println("所有线程执行完成...", count, taskcount) } var ( //从标题获取项目编号 titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)") titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]") titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$") //项目编号过滤 pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$") //项目编号只是数字或只是字母4个以下 StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$") //纯数字或纯字母 StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$") ) func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } if len(thisinfo.Topscopeclass) == 0 { thisinfo.Topscopeclass = []string{} } if len(thisinfo.Subscopeclass) == 0 { thisinfo.Subscopeclass = []string{} } //从标题中查找项目编号 res := titleGetPc.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } else { res = titleGetPc1.FindStringSubmatch(thisinfo.Title) if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) { thisinfo.PTC = res[3] } else { res = titleGetPc2.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } } } if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 { thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "") if thisinfo.ProjectName != "" { thisinfo.pnbval++ } } if thisinfo.ProjectCode != "" || thisinfo.PTC != "" { if thisinfo.ProjectCode != "" { thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 { thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "") } } else { thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 { thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "") } } if thisinfo.ProjectCode != "" || thisinfo.PTC != "" { thisinfo.pnbval++ } } if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 { thisinfo.PTC = "" } if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 { thisinfo.pnbval++ } else { thisinfo.Buyer = "" } //winners整理 winner, _ := tmp["winner"].(string) m1 := map[string]bool{} winners := []string{} if winner != "" { m1[winner] = true winners = append(winners, winner) } if thisinfo.HasPackage { packageM, _ := tmp["package"].(map[string]interface{}) for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw, _ := pm["winner"].(string) if pw != "" && !m1[pw] { m1[pw] = true winners = append(winners, pw) } } } thisinfo.Winners = winners thisinfo.LenPC = len([]rune(thisinfo.ProjectCode)) thisinfo.LenPTC = len([]rune(thisinfo.PTC)) thisinfo.LenPN = len([]rune(thisinfo.ProjectName)) return thisinfo } //从数组中删除元素 func deleteSlice(arr []string, v string) []string { for k, v1 := range arr { if v1 == v { return append(arr[:k], arr[k+1:]...) } } return arr }