package main import ( "encoding/json" "fmt" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "medical_project/config" "net" "strings" "time" ) var ( queryClose = make(chan bool) queryCloseOver = make(chan bool) pool = make(chan bool, 1) ) func updateAllQueue() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoPro.UpSertBulk(config.Conf.DB.MongoP.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoPro.UpSertBulk(config.Conf.DB.MongoP.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } // 全量合并 func taskQl(udpInfo map[string]interface{}) { defer util.Catch() q := make(map[string]interface{}) gtid, _ := udpInfo["gtid"].(string) lteid, _ := udpInfo["lteid"].(string) if mongodb.IsObjectIdHex(gtid) && mongodb.IsObjectIdHex(lteid) { q["_id"] = map[string]interface{}{ "$lte": mongodb.StringTOBsonId(lteid), "$gte": mongodb.StringTOBsonId(gtid), } } //生成查询语句执行 log.Info("查询语句:", zap.Any("q", q)) Enter(q) nextNode(udpInfo) } // 增量合并 func taskZl(udpInfo map[string]interface{}) { defer util.Catch() //开始id和结束id q, _ := udpInfo["query"].(map[string]interface{}) gtid := udpInfo["gtid"].(string) lteid := udpInfo["lteid"].(string) q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } Enter(q) } func Enter(q map[string]interface{}) { defer util.Catch() count, index := 0, 0 sess := MongoBid.GetMgoConn() defer MongoBid.DestoryMongoConn(sess) infoPool := make(chan map[string]interface{}, 2000) over := make(chan bool) go func() { L: for { select { case tmp := <-infoPool: pool <- true go func(tmp map[string]interface{}) { defer func() { <-pool }() info := ParseInfo(tmp) currentTime = info.Publishtime startProjectMerge(info, tmp) }(tmp) default: select { case tmp := <-infoPool: pool <- true go func(tmp map[string]interface{}) { defer func() { <-pool }() info := ParseInfo(tmp) currentTime = info.Publishtime startProjectMerge(info, tmp) }(tmp) case <-over: break L } } } }() fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0, "detail": 0, "contenthtml": 0, "jsondata": 0} ms := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(fields) query := ms.Iter() var lastid interface{} L: for { select { case <-queryClose: log.Error("receive interrupt sign") queryCloseOver <- true break L default: tmp := make(map[string]interface{}) if query.Next(&tmp) { lastid = tmp["_id"] if count%2000 == 0 { log.Info("current", zap.Int("count", count), zap.Any("lastid", lastid)) } if util.ObjToString(tmp["bid_field"]) == "0101" { index++ infoPool <- tmp } count++ } else { break L } } } time.Sleep(5 * time.Second) over <- true ////阻塞 for n := 0; n < 1; n++ { pool <- true } log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("index", index)) } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } // 处理publishtime为空 if thisinfo.Publishtime <= 0 { for _, d := range DateTimeSelect { if tmp[d] != nil { thisinfo.Publishtime = util.Int64All(tmp[d]) tmp["publishtime"] = tmp[d] break } } } if len(thisinfo.Topscopeclass) == 0 { thisinfo.Topscopeclass = []string{} } if len(thisinfo.Subscopeclass) == 0 { thisinfo.Subscopeclass = []string{} } if thisinfo.SubType == "" { thisinfo.SubType = util.ObjToString(tmp["bidstatus"]) } //从标题中查找项目编号 res := titleGetPc.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } else { res = titleGetPc1.FindStringSubmatch(thisinfo.Title) if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) { thisinfo.PTC = res[3] } else { res = titleGetPc2.FindStringSubmatch(thisinfo.Title) if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) { thisinfo.PTC = res[1] } } } if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 { //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "") //if thisinfo.ProjectName != "" { thisinfo.pnbval++ //} } if thisinfo.ProjectCode != "" || thisinfo.PTC != "" { if thisinfo.ProjectCode != "" { //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 { thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "") } } else { //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "") if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 { thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "") } } thisinfo.pnbval++ } if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 { thisinfo.PTC = "" } if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 { thisinfo.pnbval++ } else { thisinfo.Buyer = "" } //winners整理、清理 winner := util.ObjToString(tmp["winner"]) m1 := map[string]bool{} winners := []string{} if winner != "" { m1[winner] = true winners = append(winners, winner) } packageM, _ := tmp["package"].(map[string]interface{}) if packageM != nil { thisinfo.HasPackage = true for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw := util.ObjToString(pm["winner"]) if pw != "" && !m1[pw] { m1[pw] = true winners = append(winners, pw) } } } thisinfo.Winners = winners //清理winnerorder var wins []map[string]interface{} for _, v := range thisinfo.WinnerOrder { w := util.ObjToString(v["entname"]) if w != "" { v["entname"] = w wins = append(wins, v) } } thisinfo.WinnerOrder = wins thisinfo.LenPC = len([]rune(thisinfo.ProjectCode)) thisinfo.LenPTC = len([]rune(thisinfo.PTC)) thisinfo.LenPN = len([]rune(thisinfo.ProjectName)) //处理分包中数据异常问题 for k, tmp := range thisinfo.Package { if ps, ok := tmp.([]map[string]interface{}); ok { for i, p := range ps { name, _ := p["name"].(string) if len([]rune(name)) > 100 { p["name"] = fmt.Sprint([]rune(name[:100])) } ps[i] = p } thisinfo.Package[k] = ps } } return thisinfo } func nextNode(mapInfo map[string]interface{}) { mapInfo["stype"] = config.Conf.Udp.Next.Stype mapInfo["pici"] = pici datas, _ := json.Marshal(mapInfo) node := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } log.Info("udp next...", zap.Any("msg", mapInfo), zap.Any("to", node.String())) _ = udpClient.WriteUdp(datas, udp.OP_TYPE_DATA, node) }