package main import ( "fmt" "log" qutil "qfw/util" elastic "qfw/util/elastic" "strings" //elastic "qfw/util/elastic_v5" // "strings" "sync" "time" "gopkg.in/mgo.v2/bson" ) func biddingBackTask2(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } c, _ := mapInfo["coll"].(string) if c == "" { c, _ = biddingback["collect"].(string) } cs := strings.Split(c, ",") for _, c := range cs { //bidding库 session := mgo.GetMgoConn(86400) defer mgo.DestoryMongoConn(session) //连接信息 db, _ := biddingback["db"].(string) index, _ := biddingback["index"].(string) itype, _ := biddingback["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Select(bson.M{ "contenthtml": 0, "s_sha": 0, }).Sort("_id").Iter() //查询抽取结果 n := 0 //更新数组 arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 thread := qutil.IntAll(mapInfo["thread"]) //不传为0只生成招标索引,1生成招标+预览,2只生成预览 if thread < 1 { thread = 3 } log.Println("es线程数:", thread) espool := make(chan bool, thread) now1 := time.Now().Unix() for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if qutil.IntAll(tmp["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } ct := qutil.Int64All(tmp["comeintime"]) pt := qutil.Int64All(tmp["publishtime"]) if pt > ct+86400 || pt > now1 { //时间问题,需要更新 if ct > now1 { ct = now1 } tmp["publishtime"] = ct } ps, _ := tmp["projectscope"].(string) if ps == "" { tmp["projectscope"] = "" //= tmp["detail"] } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } else if sbd, ok := tmp["budget"].(string); ok { tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } else if sbd, ok := tmp["bidamount"].(string); ok { tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] } UpdatesLock.Lock() newTmp := map[string]interface{}{} for _, v := range biddingIndexFields { if tmp[v] != nil { if "projectinfo" == v { //处理附件 content mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } if len(newmap) > 0 { newTmp[v] = newmap } attachments := mp["attachments"] con := "" if attachments != nil { am, _ := attachments.(map[string]interface{}) if am != nil { for _, v1 := range am { vm, _ := v1.(map[string]interface{}) if vm != nil { c, _ := vm["content"].(string) con += c } } } } if con != "" { con = FilterDetailSpace(con) newTmp["attachments"] = con } } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } else if v == "budget" || v == "bidamount" { newTmp[v] = nil } } arrEs = append(arrEs, newTmp) if len(arrEs) >= BulkSizeBack { tmps := arrEs espool <- true go func(tmps []map[string]interface{}) { defer func() { <-espool }() elastic.BulkSave(index, itype, &tmps, true) }(tmps) arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%1000 == 0 { log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"])) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) } UpdatesLock.Unlock() log.Println(mapInfo, "create biddingback2 index...over", c, n) } }