package main import ( "go.mongodb.org/mongo-driver/bson" "log" qutil "qfw/util" elastic "qfw/util/elastic" "reflect" "strings" "unicode/utf8" //elastic "qfw/util/elastic_v5" "regexp" // "strings" "sync" ) var ( BulkSizeBack = 400 ESLEN = 32766 ) var reg_letter = regexp.MustCompile("[a-z]*") func biddingTask() { defer qutil.Catch() //bidding库 session := mgo.GetMgoConn() defer mgo.DestoryMongoConn(session) db := qutil.ObjToString(bidding["db"]) coll := qutil.ObjToString(bidding["collect"]) //q := map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 1643262000}} //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5db7c324a5cb26b9b78b0f09")} count, _ := session.DB(db).C(coll).Find(nil).Count() index := qutil.ObjToString(bidding["index"]) stype := qutil.ObjToString(bidding["type"]) //线程池 UpdatesLock := sync.Mutex{} qutil.Debug("查询语句:", nil, "同步总数:", count, "elastic库:") //查询招标数据 query := session.DB(db).C(coll).Find(nil).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, "publishdept": 0, }).Sort("_id").Iter() //查询抽取结果 n := 0 //更新数组 arrEs := []map[string]interface{}{} thread := 10 espool := make(chan bool, 5) var mpool = make(chan bool, thread) for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if n%2000 == 0 { log.Println("current:", n, tmp["_id"]) } if qutil.ObjToString(tmp["useable"]) == "0" { continue } mpool <- true go func(tmp map[string]interface{}) { defer func() { <-mpool }() subscopeclass, _ := tmp["subscopeclass"].([]interface{}) //subscopeclass if subscopeclass != nil { m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true newclass = append(newclass, sclass) } } tmp["s_subscopeclass"] = strings.Join(newclass, ",") tmp["subscopeclass"] = newclass } topscopeclass, _ := tmp["topscopeclass"].([]interface{}) //topscopeclass if topscopeclass != nil { m2 := map[string]bool{} newclass := []string{} for _, tc := range topscopeclass { tclass, _ := tc.(string) tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母 if !m2[tclass] { m2[tclass] = true newclass = append(newclass, tclass) } } tmp["s_topscopeclass"] = strings.Join(newclass, ",") } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } //对标的物为空处理 filetext := getFileText(tmp) filetextS := filetext if len([]rune(filetextS)) > 10 { //attach_text tmp["filetext"] = filetext } if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" { delete(tmp, "purchasing") } if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 { delete(tmp, "purchasinglist") } //数据为空处理 for _, f := range []string{"bidstatus", "city", "district", "channel"} { if fVal, ok := tmp[f].(string); ok && fVal == "" { delete(tmp, f) } } UpdatesLock.Lock() newTmp := map[string]interface{}{} for field, ftype := range biddingIndexFieldsMap { if tmp[field] != nil { // if field == "projectinfo" { mp, _ := tmp[field].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for k, ktype := range projectinfoFieldsMap { mpv := mp[k] if mpv != nil && reflect.TypeOf(mpv).String() == ktype { newmap[k] = mp[k] } } if len(newmap) > 0 { newTmp[field] = newmap } } } else if field == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for pf, pftype := range purchasinglistFieldsMap { lsmv := lsm[pf] if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[field] = purchasinglist_new } } else if field == "winnerorder" { //中标候选 winnerorder_new := []map[string]interface{}{} if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 { for _, win := range winnerorder { winMap_new := make(map[string]interface{}) winMap := win.(map[string]interface{}) for wf, wftype := range winnerorderlistFieldsMap { wfv := winMap[wf] if wfv != nil && reflect.TypeOf(wfv).String() == wftype { if wf == "sort" && qutil.Int64All(wfv) > 100 { continue } winMap_new[wf] = winMap[wf] } } if winMap_new != nil && len(winMap_new) > 0 { winnerorder_new = append(winnerorder_new, winMap_new) } } } if len(winnerorder_new) > 0 { newTmp[field] = winnerorder_new } } else if field == "qualifies" { //项目资质 qs := []string{} if q, _ := tmp[field].([]interface{}); len(q) > 0 { for _, v := range q { v1 := v.(map[string]interface{}) qs = append(qs, qutil.ObjToString(v1["key"])) } } if len(qs) > 0 { newTmp[field] = strings.Join(qs, ",") } } else if field == "review_experts" { // 评审专家 if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 { arr1 := qutil.ObjArrToStringArr(arr) newTmp[field] = strings.Join(arr1, ",") } } else if field == "entidlist" { newTmp[field] = tmp[field] } else if field == "bidopentime" { if tmp[field] != nil && tmp["bidendtime"] == nil { newTmp["bidendtime"] = tmp[field] } if tmp[field] == nil && tmp["bidendtime"] != nil { newTmp[field] = tmp["bidendtime"] } } else if field == "detail" { //过滤 detail, _ := tmp[field].(string) if len([]rune(detail)) > detailLength { detail = detail[:detailLength] } if strings.Contains(detail, qutil.ObjToString(tmp["title"])) { newTmp[field] = FilterDetail(detail) } else { newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail) } } else if field == "_id" || field == "topscopeclass" { //不做处理 newTmp[field] = tmp[field] } else if field == "publishtime" || field == "comeintime" { //字段类型不正确,特别处理 if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 { newTmp[field] = qutil.Int64All(tmp[field]) } } else if field == "s" { newTmp[field] = tmp[field] } else { //其它字段判断数据类型,不正确舍弃 if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype { continue } else { if fieldval != "" { newTmp[field] = fieldval } } } } } arrEs = append(arrEs, newTmp) if len(arrEs) >= BulkSizeBack { tmps := arrEs espool <- true go func(tmps []map[string]interface{}) { defer func() { <-espool }() elastic.BulkSave(index, stype, &tmps, true) }(tmps) arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } for i := 0; i < thread; i++ { mpool <- true } UpdatesLock.Lock() if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, stype, &tmps, true) } UpdatesLock.Unlock() log.Println("create biddingback index...over", n) } var filterReg = regexp.MustCompile("<[^>]+>") var filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]") func FilterDetail(text string) string { return filterReg.ReplaceAllString(text, "") } func FilterDetailSpace(text string) string { return filterSpace.ReplaceAllString(text, "") } // 正则判断是否包含 func checkContains(s, sub string) bool { reg := regexp.MustCompile(`(?i)(^|([\s\t\n]+))(` + sub + `)($|([\s\t\n]+))`) return reg.MatchString(s) } func getFileText(tmp map[string]interface{}) (filetext string) { if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { for _, tmpData1 := range attchMap { if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok { for _, result := range tmpData2 { if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok { if attach_url := qutil.ObjToString(resultMap["attach_url"]); attach_url != "" { bs := OssGetObject(attach_url) //oss读数据 if utf8.RuneCountInString(filetext+bs) < fileLength { filetext += bs + "\n" } else { if utf8.RuneCountInString(bs) > fileLength { filetext = bs[0:fileLength] } else { filetext = bs } break } } } } } } } return }