123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658 |
- package main
- import (
- "encoding/json"
- "fmt"
- "log"
- mu "mfw/util"
- "qfw/util"
- "regexp"
- "strings"
- "sync"
- "time"
- "github.com/robfig/cron"
- "go.mongodb.org/mongo-driver/bson/primitive"
- )
- /**
- 任务入口
- 全量、增量合并
- 更新、插入,内存清理
- 转换成info对象
- **/
- //项目合并对象
- type ProjectTask struct {
- InitMinTime int64 //最小时间,小于0的处理一次
- name string
- thread int //线程数
- //查找锁
- findLock sync.Mutex
- wg sync.WaitGroup
- //map锁
- AllIdsMapLock sync.Mutex
- //对应的id
- AllIdsMap map[string]*ID
- //采购单位、项目名称、项目编号
- mapPb, mapPn, mapPc map[string]*Key
- // mapPbLock, mapPnLock, mapPcLock sync.Mutex
- //更新或新增通道
- updatePool chan []map[string]interface{}
- //savePool chan map[string]interface{}
- //saveSign, updateSign chan bool
- //表名
- coll string
- //当前状态是全量还是增量
- currentType string //当前是跑全量还是跑增量
- //
- clearContimes int
- //当前时间
- currentTime int64
- //保存长度
- saveSize int
- pici int64
- validTime int64
- // LockPool chan *sync.Mutex
- // LockPoolLock sync.Mutex
- // m1, m23, m4 map[int]int
- // l1, l23, l4 map[int]*sync.Mutex
- }
- func NewPT() *ProjectTask {
- p := &ProjectTask{
- InitMinTime: int64(1325347200),
- name: "全/增量对象",
- thread: 4,
- updatePool: make(chan []map[string]interface{}, 5000),
- //savePool: make(chan map[string]interface{}, 2000),
- wg: sync.WaitGroup{},
- AllIdsMap: make(map[string]*ID, 5000000),
- mapPb: make(map[string]*Key, 1500000),
- mapPn: make(map[string]*Key, 5000000),
- mapPc: make(map[string]*Key, 5000000),
- saveSize: 400,
- //saveSign: make(chan bool, 1),
- //updateSign: make(chan bool, 1),
- coll: ProjectColl,
- validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
- }
- return p
- }
- var P_QL *ProjectTask
- //初始化全量合并对象
- func init() {
- P_QL = NewPT()
- log.Println(len(P_QL.updatePool))
- go P_QL.updateAllQueue()
- go P_QL.clearMem()
- }
- func (p *ProjectTask) updateAllQueue() {
- arru := make([][]map[string]interface{}, p.saveSize)
- indexu := 0
- sp := make(chan bool, 5)
- for {
- select {
- case v := <-p.updatePool:
- arru[indexu] = v
- indexu++
- if indexu == p.saveSize {
- sp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- MongoTool.UpSertBulk(p.coll, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, p.saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- sp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- MongoTool.UpSertBulk(p.coll, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, p.saveSize)
- indexu = 0
- }
- }
- }
- }
- //项目合并内存更新
- func (p *ProjectTask) clearMem() {
- c := cron.New()
- //在内存中保留最近6个月的信息
- //跑全量时每4分钟跑一次,跑增量时400分钟跑一次
- c.AddFunc("50 0/15 * * * *", func() {
- if p.currentType == "ql" || p.clearContimes >= 60 {
- //跳过的次数清零
- p.clearContimes = 0
- //信息进入查找对比全局锁
- p.findLock.Lock()
- //defer p.findLock.Unlock()
- //合并进行的任务都完成
- p.wg.Wait()
- //遍历id
- //所有内存中的项目信息
- p.AllIdsMapLock.Lock()
- //清除计数
- clearNum := 0
- for k, v := range p.AllIdsMap {
- if p.currentTime-v.P.LastTime > p.validTime {
- clearNum++
- //删除id的map
- delete(p.AllIdsMap, k)
- //删除pb
- if v.P.Buyer != "" {
- ids := p.mapPb[v.P.Buyer]
- if ids != nil {
- ids.Lock.Lock()
- ids.Arr = deleteSlice(ids.Arr, k)
- if len(ids.Arr) == 0 {
- delete(p.mapPb, v.P.Buyer)
- }
- ids.Lock.Unlock()
- }
- }
- //删除mapPn
- for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
- if vn != "" {
- ids := p.mapPn[vn]
- if ids != nil {
- ids.Lock.Lock()
- ids.Arr = deleteSlice(ids.Arr, k)
- if len(ids.Arr) == 0 {
- delete(p.mapPn, vn)
- }
- ids.Lock.Unlock()
- }
- }
- }
- //删除mapPc
- for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
- if vn != "" {
- ids := p.mapPc[vn]
- if ids != nil {
- ids.Lock.Lock()
- ids.Arr = deleteSlice(ids.Arr, k)
- if len(ids.Arr) == 0 {
- delete(p.mapPc, vn)
- }
- ids.Lock.Unlock()
- }
- }
- }
- v = nil
- }
- }
- p.AllIdsMapLock.Unlock()
- p.findLock.Unlock()
- log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb))
- } else {
- p.clearContimes++
- }
- })
- c.Start()
- select {}
- }
- //全量合并
- func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
- defer util.Catch()
- //1、检查pubilshtime索引
- db, _ := udpInfo["db"].(string)
- if db == "" {
- db = MongoTool.DbName
- }
- coll, _ := udpInfo["coll"].(string)
- if coll == "" {
- coll = ExtractColl
- }
- thread := util.IntAllDef(udpInfo["thread"], 4)
- if thread > 0 {
- p.thread = thread
- }
- q, _ := udpInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{}
- lteid, _ := udpInfo["lteid"].(string)
- var idmap map[string]interface{}
- if len(lteid) > 15 {
- idmap = map[string]interface{}{
- "$lte": StringTOBsonId(lteid),
- }
- }
- gtid, _ := udpInfo["gtid"].(string)
- if len(gtid) > 15 {
- if idmap == nil {
- idmap = map[string]interface{}{}
- }
- idmap["$gt"] = StringTOBsonId(gtid)
- }
- if idmap != nil {
- q["_id"] = idmap
- }
- }
- //生成查询语句执行
- log.Println("查询语句:", q)
- p.enter(db, coll, q)
- }
- //增量合并
- func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
- defer util.Catch()
- //1、检查pubilshtime索引
- db, _ := udpInfo["db"].(string)
- if db == "" {
- db = MongoTool.DbName
- }
- coll, _ := udpInfo["coll"].(string)
- if coll == "" {
- coll = ExtractColl
- }
- thread := util.IntAllDef(udpInfo["thread"], 4)
- if thread > 0 {
- p.thread = thread
- }
- //开始id和结束id
- q, _ := udpInfo["query"].(map[string]interface{})
- gtid := udpInfo["gtid"].(string)
- lteid := udpInfo["lteid"].(string)
- if q == nil {
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(gtid),
- "$lte": StringTOBsonId(lteid),
- },
- }
- }
- if q != nil {
- //生成查询语句执行
- p.enter(db, coll, q)
- }
- for {
- if len(P_QL.updatePool) > 0 {
- log.Println("等待调用udp", len(P_QL.updatePool))
- time.Sleep(1 * time.Second)
- } else {
- break
- }
- }
- if udpInfo["stop"] == nil {
- nextNode(udpInfo, p.pici)
- }
- }
- func StringTOBsonId(id string) primitive.ObjectID {
- objectId, _ := primitive.ObjectIDFromHex(id)
- return objectId
- }
- //通知下个节点nextNode
- func nextNode(mapInfo map[string]interface{}, pici int64) {
- mapInfo["stype"] = "project"
- mapInfo["query"] = map[string]interface{}{
- "pici": pici,
- }
- for n, to := range toaddr {
- key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
- mapInfo["key"] = key
- datas, _ := json.Marshal(mapInfo)
- node := &udpNode{datas, to, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
- }
- }
- func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
- defer util.Catch()
- count, taskcount := 0, 0
- pool := make(chan bool, p.thread)
- log.Println("start project", q)
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- infoPool := make(chan map[string]interface{}, 2000)
- over := make(chan bool)
- go func() {
- L:
- for {
- select {
- case tmp := <-infoPool:
- pool <- true
- taskcount++
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- }()
- if util.IntAll(tmp["repeat"]) == 0 {
- info := ParseInfo(tmp)
- if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
- p.currentTime = info.Publishtime
- p.startProjectMerge(info, tmp)
- }
- } else {
- //信息错误,进行更新
- }
- }(tmp)
- case <-over:
- break L
- }
- }
- }()
- ms := sess.DB(db).C(coll).Find(q).Sort("publishtime")
- if Sysconfig["hints"] != nil {
- ms.Hint(Sysconfig["hints"])
- }
- query := ms.Iter()
- //
- var lastid interface{}
- L:
- for {
- select {
- case <-queryClose:
- log.Println("receive interrupt sign")
- log.Println("close iter..", lastid, query.Cursor.Close(nil))
- queryCloseOver <- true
- break L
- default:
- tmp := make(map[string]interface{})
- if query.Next(&tmp) {
- lastid = tmp["_id"]
- if count%2000 == 0 {
- log.Println("current", count, lastid)
- }
- infoPool <- tmp
- count++
- } else {
- break L
- }
- }
- }
- time.Sleep(5 * time.Second)
- over <- true
- //阻塞
- for n := 0; n < p.thread; n++ {
- pool <- true
- }
- log.Println("所有线程执行完成...", count, taskcount)
- }
- var (
- //从标题获取项目编号
- titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
- titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
- titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
- //项目编号过滤
- pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}– ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
- //项目编号只是数字或只是字母4个以下
- StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
- //纯数字或纯字母
- StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
- )
- func ParseInfo(tmp map[string]interface{}) (info *Info) {
- bys, _ := json.Marshal(tmp)
- var thisinfo *Info
- json.Unmarshal(bys, &thisinfo)
- if thisinfo == nil {
- return nil
- }
- if len(thisinfo.Topscopeclass) == 0 {
- thisinfo.Topscopeclass = []string{}
- }
- if len(thisinfo.Subscopeclass) == 0 {
- thisinfo.Subscopeclass = []string{}
- }
- //从标题中查找项目编号
- res := titleGetPc.FindStringSubmatch(thisinfo.Title)
- if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
- thisinfo.PTC = res[1]
- } else {
- res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
- if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
- thisinfo.PTC = res[3]
- } else {
- res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
- if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
- thisinfo.PTC = res[1]
- }
- }
- }
- if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
- thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
- if thisinfo.ProjectName != "" {
- thisinfo.pnbval++
- }
- }
- if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
- if thisinfo.ProjectCode != "" {
- thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
- if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
- thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
- }
- } else {
- thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
- if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
- thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
- }
- }
- if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
- thisinfo.pnbval++
- }
- }
- if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
- thisinfo.PTC = ""
- }
- if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
- thisinfo.pnbval++
- } else {
- thisinfo.Buyer = ""
- }
- //winners整理
- winner, _ := tmp["winner"].(string)
- m1 := map[string]bool{}
- winners := []string{}
- if winner != "" {
- m1[winner] = true
- winners = append(winners, winner)
- }
- packageM, _ := tmp["package"].(map[string]interface{})
- if packageM != nil {
- thisinfo.HasPackage = true
- for _, p := range packageM {
- pm, _ := p.(map[string]interface{})
- pw, _ := pm["winner"].(string)
- if pw != "" && !m1[pw] {
- m1[pw] = true
- winners = append(winners, pw)
- }
- }
- }
- thisinfo.Winners = winners
- thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
- thisinfo.LenPTC = len([]rune(thisinfo.PTC))
- thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
- return thisinfo
- }
- //从数组中删除元素
- func deleteSlice(arr []string, v string) []string {
- for k, v1 := range arr {
- if v1 == v {
- return append(arr[:k], arr[k+1:]...)
- }
- }
- return arr
- }
- // if taskcount > 0 && taskcount%50000 == 0 { //歇歇
- // log.Println("pause start..", taskcount)
- // for n := 0; n < p.thread; n++ {
- // pool <- true
- // }
- // for n := 0; n < p.thread; n++ {
- // <-pool
- // }
- // log.Println("pause over..")
- // }
- //lastid = tmp["_id"]
- //tmp = make(map[string]interface{})
- // if count > 40000 {
- // query.Close()
- // break
- // }
- //over++
- //func (p *ProjectTask) saveQueue() {
- // arr := make([]map[string]interface{}, p.saveSize)
- // indexs := 0
- // for {
- // select {
- // case <-p.saveSign:
- // if indexs > 0 {
- // MongoTool.SaveBulk(p.coll, arr[:indexs]...)
- // arr = make([]map[string]interface{}, p.saveSize)
- // indexs = 0
- // }
- // p.updateSign <- true
- // case v := <-p.savePool:
- // arr[indexs] = v
- // indexs++
- // if indexs == p.saveSize {
- // MongoTool.SaveBulk(p.coll, arr...)
- // arr = make([]map[string]interface{}, p.saveSize)
- // indexs = 0
- // }
- // case <-time.After(100 * time.Millisecond):
- // if indexs > 0 {
- // MongoTool.SaveBulk(p.coll, arr[:indexs]...)
- // arr = make([]map[string]interface{}, p.saveSize)
- // indexs = 0
- // }
- // }
- // }
- //}
- ////项目保存和更新通道
- //func (p *ProjectTask) updateQueue() {
- // arru := make([][]map[string]interface{}, p.saveSize)
- // indexu := 0
- // for {
- // select {
- // case v := <-p.updatePool:
- // arru[indexu] = v
- // indexu++
- // if indexu == p.saveSize {
- // //更新之前先保存
- // p.saveSign <- true
- // <-p.updateSign
- // MongoTool.UpdateBulk(p.coll, arru...)
- // arru = make([][]map[string]interface{}, p.saveSize)
- // indexu = 0
- // }
- // case <-time.After(100 * time.Millisecond):
- // if indexu > 0 {
- // p.saveSign <- true
- // <-p.updateSign
- // MongoTool.UpdateBulk(p.coll, arru[:indexu]...)
- // arru = make([][]map[string]interface{}, p.saveSize)
- // indexu = 0
- // }
- // }
- // }
- //}
- //func (p *ProjectTask) ConCurrentLock(n1, n2, n3, n4 int) {
- // var lock *sync.Mutex
- // p.LockPoolLock.Lock()
- // if p.m1[n1] > 0 || p.m23[n2] > 0 || p.m23[n3] > 0 || p.m4[n4] > 0 {
- // if p.l1[n1] != nil {
- // lock = p.l1[n1]
- // } else if p.l23[n2] != nil {
- // lock = p.l23[n2]
- // } else if p.l23[n3] != nil {
- // lock = p.l23[n3]
- // } else if p.l4[n4] != nil {
- // lock = p.l4[n4]
- // }
- // } else {
- // lock = <-p.LockPool
- // }
- // if n1 > 0 {
- // p.m1[n1]++
- // p.l1[n1] = lock
- // }
- // if n2 > 0 {
- // p.m23[n2]++
- // p.l23[n2] = lock
- // }
- // if n3 > 0 {
- // p.m23[n3]++
- // p.l23[n3] = lock
- // }
- // if n4 > 0 {
- // p.m4[n4]++
- // p.l4[n4] = lock
- // }
- // p.LockPoolLock.Unlock()
- // lock.Lock()
- //}
- //func (p *ProjectTask) ConCurrentUnLock(n1, n2, n3, n4 int) {
- // var lock1 *sync.Mutex
- // p.LockPoolLock.Lock()
- // if p.l1[n1] != nil {
- // lock1 = p.l1[n1]
- // } else if p.l23[n2] != nil {
- // lock1 = p.l23[n2]
- // } else if p.l23[n3] != nil {
- // lock1 = p.l23[n3]
- // } else if p.l4[n4] != nil {
- // lock1 = p.l4[n4]
- // }
- // if p.m1[n1] > 0 {
- // p.m1[n1]--
- // if p.m1[n1] == 0 {
- // p.l1[n1] = nil
- // }
- // }
- // if p.m23[n2] > 0 {
- // p.m23[n2]--
- // if p.m23[n2] == 0 {
- // p.l23[n2] = nil
- // }
- // }
- // if p.m23[n3] > 0 {
- // p.m23[n3]--
- // if p.m23[n3] == 0 {
- // p.l23[n3] = nil
- // }
- // }
- // if p.m4[n4] > 0 {
- // p.m4[n4]--
- // if p.m4[n4] == 0 {
- // p.l4[n4] = nil
- // }
- // }
- // p.LockPoolLock.Unlock()
- // lock1.Unlock()
- //}
|