package main import ( "go.mongodb.org/mongo-driver/bson" util "utils" "utils/mongodb" ) func (t *TaskInfo) biddingDataTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { idMap := q["_id"].(map[string]interface{}) tmpQ := map[string]interface{}{} for c, id := range idMap { if idStr, ok := id.(string); ok && id != "" { tmpQ[c] = mongodb.StringTOBsonId(idStr) } } q["_id"] = tmpQ } //bidding库 biddingConn := biddingMgo.GetMgoConn() defer biddingMgo.DestoryMongoConn(biddingConn) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } else { currentColl = c } count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count() util.Debug("查询语句:", q, "同步总数:", count, "elastic库:") //查询招标数据 query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{ "projectname": 1, "title": 1, "site": 1, "href": 1, "publishtime": 1, "spidercode": 1, "extracttype": 1, }).Sort("_id").Iter() n := 0 //更新数组 var mpool = make(chan bool, t.thread) for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if n%20000 == 0 { util.Debug("current:", n, tmp["_id"]) } mpool <- true go func(tmp map[string]interface{}) { defer func() { <-mpool }() newTmp := GetEsField(tmp, nil, t.stype) newTmp["extracttype"] = util.IntAll(tmp["extracttype"]) saveEsAllPool <- newTmp }(tmp) tmp = make(map[string]interface{}) } util.Debug(mapInfo, "create biddingdata index...over", n) }