package main import ( "mongodb" "regexp" "time" //"fmt" "log" qutil "qfw/util" elastic "qfw/util/elastic" "reflect" "strings" "sync" "gopkg.in/mgo.v2/bson" ) //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 /* 注意: 1、biddingall任务跑历史数据生成索引并更新bidding表 2、调用biddingall任务时config.json中indexfields配置要有purchasing、purchasinglist、filetext */ var RegSpace = regexp.MustCompile("[\\s\u3000\u2003\u00a0]+") func biddingAllTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() thread := 10 var mpool = make(chan bool, thread) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { idMap := q["_id"].(map[string]interface{}) tmpQ := map[string]interface{}{} for c, id := range idMap { if idStr, ok := id.(string); ok && id != "" { tmpQ[c] = mongodb.StringTOBsonId(idStr) } } q["_id"] = tmpQ } //bidding库 session := mgo.GetMgoConn() defer mgo.DestoryMongoConn(session) //extract库 extractsession := extractmgo.GetMgoConn() defer extractmgo.DestoryMongoConn(extractsession) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() fields := strings.Split(bidding["fields"].(string), ",") //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) //查询招标数据 query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Sort("_id").Iter() //查询抽取结果 extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() n := 0 //更新数组 arr := [][]map[string]interface{}{} arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare map[string]interface{} bnil := false for tmp := make(map[string]interface{}); query.Next(tmp); n++ { // if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引 // tmp = make(map[string]interface{}) // continue // } //if qutil.IntAll(tmp["repeat"]) != 0 { // esQ := `{"query": {"bool": {"must": [{"term": {"id": "`+ mongodb.BsonIdToSId(tmp["_id"]) +`"}}]}}}` // esData := elastic.Get(index, itype, esQ) // if len(*esData) > 0 { // elastic.DelById(index, itype, mongodb.BsonIdToSId(tmp["_id"])) // } //} if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } update := map[string]interface{}{} del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段 //对比方法---------------- for { if compare == nil { compare = make(map[string]interface{}) if !extractquery.Next(compare) { break } } if compare != nil { //对比 cid := mongodb.BsonIdToSId(compare["_id"]) tid := mongodb.BsonIdToSId(tmp["_id"]) if cid == tid { bnil = false //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k, _ := range tmpmodifyinfo { modifyinfo[k] = true } } for _, k := range fields { //fields更新到mongo的字段 v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil && !modifyinfo[k] { update[k] = v1 } else if v2 != nil && v1 == nil && !modifyinfo[k] { // if k == "s_subscopeclass" && del["subscopeclass"] == nil { continue } else if k == "s_topscopeclass" && del["topscopeclass"] == nil { continue } del[k] = 1 //qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2) } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } break } else { if cid < tid { bnil = false compare = nil continue } else { bnil = true break } } } else { bnil = false break } } //下面可以多线程跑的---> //处理分类 mpool <- true _id := tmp["_id"] go func(tmp, update, compare, del map[string]interface{}, bnil bool) { defer func() { <-mpool }() if !bnil && compare != nil { subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass if subscopeclass != nil { m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass if topscopeclass != nil { m2 := map[string]bool{} newclass := []string{} for _, tc := range topscopeclass { tclass, _ := tc.(string) tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母 if !m2[tclass] { m2[tclass] = true newclass = append(newclass, tclass) } } update["s_topscopeclass"] = strings.Join(newclass, ",") } //处理中标企业 // winner, _ := compare["winner"].(string) // m1 := map[string]bool{} // if winner != "" { // m1[winner] = true // } // package1 := compare["package"] // if package1 != nil { // packageM, _ := package1.(map[string]interface{}) // for _, p := range packageM { // pm, _ := p.(map[string]interface{}) // pw, _ := pm["winner"].(string) // if pw != "" { // m1[pw] = true // } // } // } compare = nil // if len(m1) > 0 { // //str := "," // winnerarr := []string{} // for k, _ := range m1 { // //str += k + "," // winnerarr = append(winnerarr, k) // } // update["s_winner"] = strings.Join(winnerarr, ",") // } } //------------------对比结束 //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } if tmp["s_winner"] != "" { sWinnerarr := strings.Split(qutil.ObjToString(tmp["s_winner"]), ",") var cid []string for _, w := range sWinnerarr { if w != "" { ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w}) if len(*ent) > 0 { cid = append(cid, qutil.ObjToString((*ent)["company_id"])) } } } if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid tmp_up := []map[string]interface{}{} tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]}) tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}}) UpdataMgoCache <- tmp_up } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } //对标的物为空处理 filetext := getFileText(tmp) filetextS := filetext if len([]rune(filetextS)) > 10 { //attach_text tmp["filetext"] = filetext } if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" { delete(tmp, "purchasing") } if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 { delete(tmp, "purchasinglist") } //数据为空处理 for _, f := range []string{"bidstatus", "city", "district", "channel"} { if fVal, ok := tmp[f].(string); ok && fVal == "" { delete(tmp, f) } } //预算和中标金额 // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { // tmp["budget"] = nil // } else if sbd, ok := tmp["budget"].(string); ok { // tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] // } // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { // tmp["bidamount"] = nil // } else if sbd, ok := tmp["bidamount"].(string); ok { // tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] // } //go IS.Add("bidding") UpdatesLock.Lock() if qutil.IntAll(update["extracttype"]) != -1 { newTmp := map[string]interface{}{} for field, ftype := range biddingIndexFieldsMap { // if tmp[field] != nil && del[field] == nil { // //qutil.Debug(field, tmp[field], reflect.TypeOf(tmp[field]).String(), ftype) if field == "projectinfo" { mp, _ := tmp[field].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for k, ktype := range projectinfoFieldsMap { mpv := mp[k] if mpv != nil && reflect.TypeOf(mpv).String() == ktype { newmap[k] = mp[k] } } if len(newmap) > 0 { newTmp[field] = newmap } } } else if field == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for pf, pftype := range purchasinglistFieldsMap { lsmv := lsm[pf] if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[field] = purchasinglist_new } } else if field == "winnerorder" { //中标候选 winnerorder_new := []map[string]interface{}{} if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 { for _, win := range winnerorder { winMap_new := make(map[string]interface{}) winMap := win.(map[string]interface{}) for wf, wftype := range winnerorderlistFieldsMap { wfv := winMap[wf] if wfv != nil && reflect.TypeOf(wfv).String() == wftype { if wf == "sort" && qutil.Int64All(wfv) > 100 { continue } winMap_new[wf] = winMap[wf] } } if winMap_new != nil && len(winMap_new) > 0 { winnerorder_new = append(winnerorder_new, winMap_new) } } } if len(winnerorder_new) > 0 { newTmp[field] = winnerorder_new } } else if field == "qualifies" { //项目资质 qs := []string{} if q, _ := tmp[field].([]interface{}); len(q) > 0 { for _, v := range q { v1 := v.(map[string]interface{}) qs = append(qs, qutil.ObjToString(v1["key"])) } } if len(qs) > 0 { newTmp[field] = strings.Join(qs, ",") } } else if field == "review_experts" { // 评审专家 if arr, ok :=tmp["review_experts"].([]interface{}); ok && len(arr) > 0 { arr1 := qutil.ObjArrToStringArr(arr) newTmp[field] = strings.Join(arr1, ",") } } else if field == "detail" { //过滤 detail, _ := tmp[field].(string) if len([]rune(detail)) > detailLength { detail = detail[:detailLength] } newTmp[field] = FilterDetail(detail) } else if field == "_id" || field == "topscopeclass" { //不做处理 newTmp[field] = tmp[field] } else if field == "publishtime" || field == "comeintime" { //字段类型不正确,特别处理 if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 { newTmp[field] = qutil.Int64All(tmp[field]) } } else if field == "s" { newTmp[field] = tmp[field] } else { //其它字段判断数据类型,不正确舍弃 if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype { continue } else { if fieldval != "" { newTmp[field] = fieldval } } } } } arrEs = append(arrEs, newTmp) } if len(update) > 0 { delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除 queryId := map[string]interface{}{"_id": tmp["_id"]} set := map[string]interface{}{"$set": update} if len(del) > 0 { //删除的数据 set["$unset"] = del } arr = append(arr, []map[string]interface{}{queryId, set}) } if len(arr) >= BulkSize { mgo.UpdateBulkAll(db, c, arr...) arr = [][]map[string]interface{}{} } if len(arrEs) >= BulkSize { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } if other_index != "" && other_itype != "" { //备份库同时生索引 bidding_other_es.BulkSave(other_index, other_itype, &tmps, true) } arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() }(tmp, update, compare, del, bnil) if n%1000 == 0 { log.Println("current:", n, _id) } tmp = make(map[string]interface{}) } for i := 0; i < thread; i++ { mpool <- true } UpdatesLock.Lock() //log.Println(db, c, index, itype, arr, arrEs) if len(arr) > 0 { mgo.UpdateBulkAll(db, c, arr...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } if other_index != "" && other_itype != "" { //备份库同时生索引 bidding_other_es.BulkSave(other_index, other_itype, &tmps, true) } } UpdatesLock.Unlock() log.Println(mapInfo, "create bidding index...over", n) } //更新extract表 func UpdateExtract() { qutil.Debug("Update Extract...") extract := qutil.ObjToString(bidding["extractcollect"]) arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-UpdataMgoCache: arru[indexu] = v indexu++ if indexu == 200 { SP <- true go func(arru [][]map[string]interface{}) { defer func() { <-SP }() extractmgo.UpdateBulk(extract, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru [][]map[string]interface{}) { defer func() { <-SP }() extractmgo.UpdateBulk(extract, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }