package main import ( "esindex/config" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) func biddingDataTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { idMap := q["_id"].(map[string]interface{}) tmpQ := map[string]interface{}{} for c, id := range idMap { if idStr, ok := id.(string); ok && id != "" { tmpQ[c] = mongodb.StringTOBsonId(idStr) } } q["_id"] = tmpQ } //bidding库 biddingConn := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(biddingConn) count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count() log.Info("biddingDataTask", zap.Any("查询语句", q), zap.Any("同步总数:", count)) //查询招标数据 query := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(bson.M{ "projectname": 1, "title": 1, "site": 1, "href": 1, "publishtime": 1, "spidercode": 1, "extracttype": 1, "comeintime": 1, }).Sort("_id").Iter() n := 0 //更新数组 var mpool = make(chan bool, 30) for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if n%20000 == 0 { log.Info("current", zap.Int("count", n), zap.Any("current id", tmp["_id"])) } mpool <- true go func(tmp map[string]interface{}) { defer func() { <-mpool }() newTmp, _ := GetEsField(tmp, stype) newTmp["extracttype"] = util.IntAll(tmp["extracttype"]) saveEsAllPool <- newTmp }(tmp) tmp = make(map[string]interface{}) } log.Info("create biddingdata index over", zap.Any("mapInfo", mapInfo), zap.Int("count", n)) }