package main import ( "fmt" "log" qutil "qfw/util" elastic "qfw/util/elastic" //elastic "qfw/util/elastic_v5" "regexp" // "strings" "sync" "time" "gopkg.in/mgo.v2/bson" ) var ( BulkSizeBack = 400 ESLEN = 32766 ) func biddingBackTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //bidding库 session := mgo.GetMgoConn(86400) defer mgo.DestoryMongoConn(session) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { // c, _ = bidding["collect"].(string) c, _ = biddingback["collect"].(string) } db, _ := biddingback["db"].(string) index, _ := biddingback["index"].(string) itype, _ := biddingback["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) //查询招标数据 // query := session.DB(db).C(c).Find(q).Select(bson.M{ // //"projectinfo.attachment": 0, // "contenthtml": 0, // }).Sort("_id").Iter() query := session.DB(db).C(c).Find(q).Select(bson.M{ "contenthtml": 0, "s_sha": 0, }).Sort("_id").Iter() //查询抽取结果 n := 0 //更新数组 arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 thread := qutil.IntAll(mapInfo["thread"]) //不传为0只生成招标索引,1生成招标+预览,2只生成预览 _multiIndex := qutil.IntAll(mapInfo["multiIndex"]) if thread < 1 { thread = 3 } log.Println("es线程数:", thread) espool := make(chan bool, thread) now1 := time.Now().Unix() for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if qutil.IntAll(tmp["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } ct := qutil.Int64All(tmp["comeintime"]) pt := qutil.Int64All(tmp["publishtime"]) if pt > ct+86400 || pt > now1 { //时间问题,需要更新 if ct > now1 { ct = now1 } tmp["publishtime"] = ct } ps, _ := tmp["projectscope"].(string) if ps == "" { tmp["projectscope"] = "" //= tmp["detail"] } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } else if sbd, ok := tmp["budget"].(string); ok { tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } else if sbd, ok := tmp["bidamount"].(string); ok { tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] } UpdatesLock.Lock() newTmp := map[string]interface{}{} for _, v := range biddingIndexFields { if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } else if v == "budget" || v == "bidamount" { newTmp[v] = nil } } arrEs = append(arrEs, newTmp) if len(arrEs) >= BulkSizeBack { tmps := arrEs espool <- true go func(tmps []map[string]interface{}) { defer func() { <-espool }() if _multiIndex == 0 { elastic.BulkSave(index, itype, &tmps, true) } else if _multiIndex == 1 && len(multiIndex) == 2 { elastic.BulkSave(index, itype, &tmps, true) elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } else if _multiIndex == 2 && len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } }(tmps) arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%1000 == 0 { log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"])) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arrEs) > 0 { tmps := arrEs if _multiIndex == 0 { elastic.BulkSave(index, itype, &tmps, true) } else if _multiIndex == 1 && len(multiIndex) == 2 { elastic.BulkSave(index, itype, &tmps, true) elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } else if _multiIndex == 2 && len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } } UpdatesLock.Unlock() log.Println(mapInfo, "create biddingback index...over", n) } var filterReg = regexp.MustCompile("<[^>]+>") func FilterDetail(text string) string { return filterReg.ReplaceAllString(text, "") }