// projectmegerinsert package main import ( "encoding/json" "flag" "fmt" du "jy/util" "log" qu "qfw/util" "sort" "strings" "sync" "time" "gopkg.in/mgo.v2/bson" ) const ( Select3To2 = iota //三选二合并 Select3To1 //三选一合并 AloneProject //孤立核查项目 InvalidInfo //无效信息 ) var ( OmitNumMax, DeviationDay, HourInterval int64 //提取最大遗漏数据量,项目查询时间修正区间,轮循间隔 InfoRScore = map[string][]*RScoreInfo{} PnAll, PcAll, PbAll = map[string][]string{}, map[string][]string{}, map[string][]string{} ) type RScoreInfo struct { //对比结果集 Id string //信息id Pid string //项目id Pkey string Score int ProjectNameType string ProjectCodeType string BuyerType string AreaType string AgencyType string Cresult string Info *Info Pinfo *ProjectInfo } type MegerInfo struct { //待合并分段数据 StartPublishtime int64 EndPublishtime int64 Num int Minfo []*Info Lock *sync.Mutex } var StartId string func main_inc() { flag.StringVar(&StartId, "StartId", "", "开始_id") flag.Parse() //StartId = "56388138af53745d9a000001" log.Println("StartId", StartId) if StartId == "" { return } startInsertMeger(StartId) } //开始插入合并 func startInsertMeger(startId string) { datas := getOmitData(startId) for _, minfo := range datas { if int64(minfo.Num) < OmitNumMax { log.Println("分段信息量太小,不执行", minfo.Num) continue } getPncbKey(minfo) compareMeger(minfo) //清空相关信息 minfo = nil InfoRScore = map[string][]*RScoreInfo{} } time.AfterFunc(time.Duration(HourInterval)*time.Hour, func() { startInsertMeger(startId) }) } //获取遗漏数据(分段) func getOmitData(startId string) (list []*MegerInfo) { log.Println("加载分段信息") q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": bson.ObjectIdHex(startId), }, } log.Println(MQFW.DbName, extractColl, q) sess := MQFW.GetMgoConn() defer MQFW.DestoryMongoConn(sess) //数据正序处理 it := sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").Iter() minfo := &MegerInfo{Lock: &sync.Mutex{}, Minfo: []*Info{}} var lastId string for tmp := make(map[string]interface{}); it.Next(tmp); { if qu.IntAll(tmp["repeat"]) == 1 { continue } if tmp["meger_sflag"] != nil { continue } b, mv := isMegerProjectAndProcess(tmp) if !b { //是否参与合并 if mv == AloneProject { //生成孤立项目 thisinfo := PreThisInfo(tmp) newProjectInc(tmp, map[string]interface{}{"meger_sflag": "alone"}, time.Now().Unix(), thisinfo) } if mv == InvalidInfo { //无效信息,打标记 extInfoTag("invalid", qu.BsonIdToSId(tmp["_id"])) } continue } this := PreThisInfo(tmp) if this == nil { continue } this.MererInc = mv this.Data = tmp tmp = make(map[string]interface{}) if minfo.StartPublishtime == 0 { minfo.StartPublishtime = this.Publishtime } else { minfo.EndPublishtime = this.Publishtime } //分段 if (minfo.EndPublishtime-minfo.StartPublishtime)/int64(86400) > DeviationDay*int64(2) || OmitNumMax <= int64(minfo.Num) { log.Println(len(list), "段信息加载完成,信息量", minfo.Num) list = append(list, minfo) minfo = &MegerInfo{} break } lastId = this.Id minfo.Minfo = append(minfo.Minfo, this) minfo.Num += 1 } if minfo.Num > 0 { list = append(list, minfo) } log.Println("getOmitData lastId", lastId) return } //加载分段pncb key func getPncbKey(minfo *MegerInfo) { log.Println("pncb key 开始加载") //计算时间区间 var startTime, endTime int64 if minfo.EndPublishtime+DeviationDay*86400 <= time.Now().Unix() { startTime = minfo.EndPublishtime - DeviationDay*86400 endTime = minfo.EndPublishtime + DeviationDay*86400 } else { startTime = minfo.EndPublishtime - DeviationDay*86400 - (minfo.EndPublishtime + DeviationDay*86400 - time.Now().Unix()) endTime = time.Now().Unix() } q := map[string]interface{}{ "lastpublishtime": map[string]interface{}{ //lastpublishtime "$gte": startTime, "$lte": endTime, }, } log.Println("getPncbKey", q, startTime, endTime) //pn,pc,pb加载内存中 sess := MQFW.GetMgoConn() defer MQFW.DestoryMongoConn(sess) it := sess.DB(MQFW.DbName).C(projectColl).Find(q).Sort("lastpublishtime").Iter() //it := sess.DB(MQFW.DbName).C(projectColl).Find(map[string]interface{}{}).Sort("pici").Iter() for tmp := make(map[string]interface{}); it.Next(tmp); { if qu.ObjToString(tmp["meger_sflag"]) == "normal" { pn := "pn_" + qu.ObjToString(tmp["projectname"]) pc := "pc_" + qu.ObjToString(tmp["projectcode"]) pb := "pb_" + qu.ObjToString(tmp["buyer"]) pid := qu.BsonIdToSId(tmp["_id"]) if len(pn) > 3 { PnAll[pn] = append(PnAll[pn], pid) } if len(pc) > 3 { PcAll[pc] = append(PnAll[pc], pid) } if len(pb) > 3 { PbAll[pb] = append(PnAll[pb], pid) } } } log.Println("pncb key 加载完成", "pn:", len(PnAll), "pb:", len(PbAll), "pc:", len(PcAll)) } //对比打分 func compareMeger(minfo *MegerInfo) { for _, info := range minfo.Minfo { pids := PnAll["pn_"+info.ProjectName] compareScoreMeger("pn", pids, info) pids = PcAll["pc_"+info.ProjectCode] compareScoreMeger("pc", pids, info) pids = PbAll["pb_"+info.Buyer] compareScoreMeger("pb", pids, info) infors, _ := InfoRScore[info.Id] sort.Slice(infors, func(i, j int) bool { return infors[i].Score > infors[j].Score }) if len(infors) > 0 { //合并项目 info := infors[0] info.Info.Data["cresult"] = info.Cresult info.Info.Data["score"] = info.Score id := updateinfoInc(info.Info, info.Info.Data, info.Pinfo) log.Println("合并项目", info.Info.ProjectName, info.Info.ProjectCode, info.Info.Buyer) switch info.Pkey { case "pn": if len(PnAll[info.Pkey+info.Info.ProjectName]) > 0 { PnAll[info.Pkey+info.Info.ProjectName] = append(PnAll[info.Pkey+info.Info.ProjectName], id) } case "pc": if len(PcAll[info.Pkey+info.Info.ProjectCode]) > 0 { PcAll[info.Pkey+info.Info.ProjectCode] = append(PcAll[info.Pkey+info.Info.ProjectCode], id) } case "pb": if len(PbAll[info.Pkey+info.Info.Buyer]) > 0 { PbAll[info.Pkey+info.Info.Buyer] = append(PbAll[info.Pkey+info.Info.Buyer], id) } } } else { //新增项目 id := newProjectInc( info.Data, map[string]interface{}{ "meger_sflag": "normal", }, time.Now().Unix(), info, ) log.Println("新增项目", info.ProjectName, info.ProjectCode, info.Buyer) if len((*info).ProjectName) > 0 { if PnAll["pn_"+info.ProjectName] != nil { PnAll["pn_"+info.ProjectName] = append(PnAll["pn_"+info.ProjectName], id) } else { PnAll["pn_"+info.ProjectName] = []string{id} } } if len((*info).ProjectCode) > 0 { if PcAll["pc_"+info.ProjectCode] != nil { PcAll["pc_"+info.ProjectCode] = append(PcAll["pc_"+info.ProjectCode], id) } else { PcAll["pc_"+info.ProjectCode] = []string{id} } } if len((*info).Buyer) > 0 { if PbAll["pb_"+info.Buyer] != nil { PbAll["pb_"+info.Buyer] = append(PbAll["pb_"+info.Buyer], id) } else { PbAll["pb_"+info.Buyer] = []string{id} } } } } } func compareScoreMeger(ktype string, pids []string, info *Info) { var projects []*ProjectInfo for _, id := range pids { var projectInfo *ProjectInfo if pinfo, b := MQFW.FindById(projectColl, id, nil); b { bys, _ := json.Marshal(pinfo) json.Unmarshal(bys, &projectInfo) if projectInfo == nil { continue } //拼装projectInfo对象 projectInfo.Id = projectInfo.IdInc projects = append(projects, projectInfo) } } if info.MererInc == Select3To1 { for _, project := range projects { score3Select1Inc(ktype, project, info) } } if info.MererInc == Select3To2 { for _, project := range projects { score3Select2Inc(ktype, project, info) } } } //3选2打分 func score3Select2Inc(ktype string, pinfo *ProjectInfo, thisinfo *Info) { defer qu.Catch() rsInfos := InfoRScore[thisinfo.Id] if rsInfos == nil { rsInfos = []*RScoreInfo{} } rsinfo := &RScoreInfo{Id: thisinfo.Id, Info: thisinfo, Pid: pinfo.Id, Pinfo: pinfo} rsinfo.BuyerType, rsinfo.Score = fieldPCBScore(thisinfo.Buyer, pinfo.Buyer, rsinfo.BuyerType, rsinfo.Score) if len(thisinfo.Buyer) > 0 { rsinfo.ProjectNameType, rsinfo.Score = fieldPCBScore(thisinfo.ProjectName, pinfo.ProjectName, rsinfo.ProjectNameType, rsinfo.Score) rsinfo.ProjectCodeType, rsinfo.Score = fieldPCBScore(thisinfo.ProjectCode, pinfo.ProjectCode, rsinfo.ProjectCodeType, rsinfo.Score) } else { //无采购单位,打分考虑长度 if len([]rune(thisinfo.ProjectName)) > MegerFieldsLen.ProjectNamelen { rsinfo.ProjectNameType, rsinfo.Score = fieldPCBScore(thisinfo.ProjectName, pinfo.ProjectName, rsinfo.ProjectNameType, rsinfo.Score) } else { rsinfo.ProjectNameType = "D" } if len(thisinfo.ProjectCode) > MegerFieldsLen.ProjectCodelen { rsinfo.ProjectCodeType, rsinfo.Score = fieldPCBScore(thisinfo.ProjectCode, pinfo.ProjectCode, rsinfo.ProjectCodeType, rsinfo.Score) } else { rsinfo.ProjectCodeType = "D" } } //省市打分 if thisinfo.Area != "A" && thisinfo.Area != "全国" && pinfo.Area != "A" && pinfo.Area != "全国" { if thisinfo.Area == pinfo.Area && thisinfo.City == pinfo.City { rsinfo.Score += 2 } else { rsinfo.Score -= 1 } } else { rsinfo.Score += 1 } //代理机构打分 if len([]rune(pinfo.Agency)) > 0 { if thisinfo.Agency == pinfo.Agency { //A rsinfo.Score += 2 } else if strings.Index(pinfo.Agency, thisinfo.Agency) > -1 || strings.Index(thisinfo.Agency, pinfo.Agency) > -1 { //B rsinfo.Score += 1 } else { if len(thisinfo.Agency) < 1 { //E rsinfo.Score -= 1 } else { //C rsinfo.Score -= 2 } } } else { //D不计分 // } rsinfo.Pkey = ktype rsinfo.Cresult = fmt.Sprintf("%s%s%s", rsinfo.BuyerType, rsinfo.ProjectNameType, rsinfo.ProjectCodeType) ThreeToLock.Lock() if ThreeToTow[rsinfo.Cresult] { rsInfos = append(rsInfos, rsinfo) InfoRScore[thisinfo.Id] = rsInfos } ThreeToLock.Unlock() } //3选1打分 func score3Select1Inc(ktype string, pinfo *ProjectInfo, thisinfo *Info) { defer qu.Catch() rsInfos := InfoRScore[thisinfo.Id] if rsInfos == nil { rsInfos = []*RScoreInfo{} } rsinfo := &RScoreInfo{Id: thisinfo.Id, Info: thisinfo, Pid: pinfo.Id, Pinfo: pinfo} if ktype == "pn" { //比较字段项目名称 if len(pinfo.ProjectName) > 0 { if thisinfo.ProjectName == pinfo.ProjectName { //A rsinfo.Score += 2 rsinfo.ProjectNameType = "A" } else if strings.Index(pinfo.ProjectName, thisinfo.ProjectName) > -1 || strings.Index(thisinfo.ProjectName, pinfo.ProjectName) > -1 { //B rsinfo.Score += 1 rsinfo.ProjectNameType = "B" } else { //C rsinfo.Score -= 2 rsinfo.ProjectNameType = "C" } } else { //D不计分 rsinfo.ProjectNameType = "D" } } if ktype == "pc" { //比较字段项目编号 if len(pinfo.ProjectCode) > 0 { if thisinfo.ProjectCode == pinfo.ProjectCode { //A rsinfo.Score += 2 rsinfo.ProjectCodeType = "A" } else if strings.Index(pinfo.ProjectCode, thisinfo.ProjectCode) > -1 || strings.Index(thisinfo.ProjectCode, pinfo.ProjectCode) > -1 { //B rsinfo.Score += 1 rsinfo.ProjectCodeType = "B" } else { //C rsinfo.Score -= 2 rsinfo.ProjectCodeType = "C" } } else { //D不计分 rsinfo.ProjectCodeType = "D" } } if thisinfo.Area != "A" && thisinfo.Area != "全国" && pinfo.Area != "A" && pinfo.Area != "全国" { if thisinfo.Area == pinfo.Area && thisinfo.City == pinfo.City { rsinfo.Score += 2 rsinfo.AreaType = "A" } else { rsinfo.Score -= 1 rsinfo.AreaType = "C" } } else { rsinfo.Score += 1 rsinfo.AreaType = "B" } if len([]rune(pinfo.Agency)) > 0 { if thisinfo.Agency == pinfo.Agency { //A rsinfo.Score += 2 rsinfo.AgencyType = "A" } else if strings.Index(pinfo.Agency, thisinfo.Agency) > -1 || strings.Index(thisinfo.Agency, pinfo.Agency) > -1 { //B rsinfo.Score += 1 rsinfo.AgencyType = "B" } else { if len(thisinfo.Agency) < 1 { //E rsinfo.Score -= 1 rsinfo.AgencyType = "E" } else { //C rsinfo.Score -= 2 rsinfo.AgencyType = "C" } } } else { //D不计分 rsinfo.AgencyType = "D" } rsinfo.Pkey = ktype rsinfo.Cresult = fmt.Sprintf("%s%s%s%s", rsinfo.ProjectNameType, rsinfo.ProjectCodeType, rsinfo.AreaType, rsinfo.AgencyType) ThreeToLock.Lock() if ThreeToOne[rsinfo.Cresult] { rsInfos = append(rsInfos, rsinfo) InfoRScore[thisinfo.Id] = rsInfos } ThreeToLock.Unlock() } //判断是否合并项目、并确定打分流程 func isMegerProjectAndProcess(tmp map[string]interface{}) (b bool, res int) { b = false pcbv := PCBVal(tmp) bNormalScore := true if checkInfoAlter(tmp) && pcbv.Val < 1 { bNormalScore = false res = InvalidInfo //无效信息,打标记 } if bNormalScore { if pcbv.Buyer { if pcbv.PnameLen > 0 || pcbv.PcodeLen > 0 { res = Select3To2 //3选2打分 b = true } else { res = AloneProject //生成核查新项目 } } else { if pcbv.PnameLen > 0 { if pcbv.PcodeLen > 0 { res = Select3To2 //3选2打分 b = true } else { if pcbv.PnameLen > MegerFieldsLen.ProjectNamelen { if pcbv.Agency && pcbv.Area { res = Select3To1 //3选1打分 b = true } else { res = AloneProject //生成核查新项目 } } else { res = AloneProject //生成核查新项目 } } } else { if pcbv.Pcode { if pcbv.PcodeLen > MegerFieldsLen.ProjectCodelen { if pcbv.Agency && pcbv.Area { res = Select3To1 //3选1打分 b = true } else { res = AloneProject //生成核查新项目 } } else { res = AloneProject //生成核查新项目 } } else { res = InvalidInfo //无效信息,打标记 } } } } return } //新增项目 func newProjectInc(tmp, mess map[string]interface{}, pipc int64, thisinfo *Info) (id string) { id = InsertProject(thisinfo.NewPNKey, tmp, mess, pipc, thisinfo) sflag := qu.ObjToString(mess["meger_sflag"]) if sflag == "alone" { du.Debug("新增项目,不参与对比", id) } return id } //更新项目 func updateinfoInc(thisinfo *Info, tmp map[string]interface{}, pInfo *ProjectInfo) string { updateid := pInfo.Id set := map[string]interface{}{} res, bres := MQFW.FindById(projectColl, pInfo.Id, `{"list":0}`) EqInfoUpdate(thisinfo, pInfo) if bres && res != nil && *res != nil { set["topscopeclass"] = pInfo.Topscopeclass set["subscopeclass"] = pInfo.Subscopeclass s_subscopeclass := strings.Join(pInfo.Subscopeclass, ",") if len(s_subscopeclass) > 0 { s_subscopeclass = "," + s_subscopeclass + "," } set["s_subscopeclass"] = s_subscopeclass s_winner := strings.Join(pInfo.Winners, ",") if len(s_winner) > 0 { s_winner = "," + s_winner + "," } set["s_winner"] = s_winner if pInfo.Buyerperson != "" && pInfo.Buyertel != "" { set["buyerperson"] = pInfo.Buyerperson set["buyertel"] = pInfo.Buyertel } if pInfo.Buyerclass != "" { set["buyerclass"] = pInfo.Buyerclass } if pInfo.District != "" { set["district"] = pInfo.District } if pInfo.Bidopentime > 0 { set["bidopentime"] = pInfo.Bidopentime } if len(pInfo.Winnerorder) > 0 { set["winnerorder"] = pInfo.Winnerorder } if thisinfo.HasPackage { set["multipackage"] = 1 } else { set["multipackage"] = 0 } e := InitEL(qu.ObjToString((*res)["extractpos"])) if thisinfo.dealtype == 1 { var sonpackage map[string]interface{} for _, obj := range tmp["package"].(map[string]interface{}) { sonpackage, _ = obj.(map[string]interface{}) } for _, v2 := range []string{"budget", "budget_w", "winner", "winner_w", "bidstatus", "bidstatus_w"} { if sonpackage[v2] != nil { tmp[v2] = sonpackage[v2] } } } e.fieldpriority(&tmp, res, &set) set["extractpos"] = e.GetVal() if thisinfo.HasPackage { //多包处理 p1, _ := (*res)["package"].(map[string]interface{}) p2, _ := tmp["package"].(map[string]interface{}) if p2 != nil { if p1 != nil { for pk2, pv2 := range p2 { if p1[pk2] != nil { //合并 item1, _ := p1[pk2].(map[string]interface{}) item2, _ := pv2.(map[string]interface{}) if item1 != nil && item2 != nil { //原始项 for ik1, iv1 := range item2 { if item1[ik1] == nil { item1[ik1] = iv1 } } } } else { p1[pk2] = pv2 } } } else { p1 = p2 } } set["package"] = p1 } //中标候选人合并 update := map[string]interface{}{} if len(set) > 0 { update["$set"] = set } //保留原数据吧 push := NewPushInfo(tmp) for tkey, _ := range extractpos { if tmp[tkey] != nil { push[tkey] = tmp[tkey] } } update["$push"] = map[string]interface{}{ "list": push, } if len(update) > 0 { MQFW.Update(projectColl, map[string]interface{}{ "_id": qu.StringTOBsonId(pInfo.Id), }, &update, false, false) } } return updateid }