package main import ( "esindex/config" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "time" ) func biddingDataTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { idMap := q["_id"].(map[string]interface{}) tmpQ := map[string]interface{}{} for c, id := range idMap { if idStr, ok := id.(string); ok && id != "" { tmpQ[c] = mongodb.StringTOBsonId(idStr) } } q["_id"] = tmpQ } //bidding库 biddingConn := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(biddingConn) count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count() log.Info("biddingDataTask", zap.Any("查询语句", q), zap.Any("同步总数:", count)) //查询招标数据 query := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(bson.M{ "projectname": 1, "title": 1, "site": 1, "href": 1, "publishtime": 1, "spidercode": 1, "extracttype": 1, "comeintime": 1, "buyer": 1, //采购单位 "s_winner": 1, //中标单位 "projectcode": 1, //项目标号 "agency": 1, //代理机构 "budget": 1, //预算 "area": 1, //省份 "city": 1, //城市 "district": 1, //区县 "bidamount": 1, //中标金额 "toptype": 1, //一级分类 "subtype": 1, //二级分类 }).Sort("_id").Iter() n := 0 //更新数组 var mpool = make(chan bool, 30) for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if n%20000 == 0 { log.Info("current", zap.Int("count", n), zap.Any("current id", tmp["_id"])) } mpool <- true go func(tmp map[string]interface{}) { defer func() { <-mpool }() newTmp, _ := GetEsField(tmp, stype) newTmp["extracttype"] = util.IntAll(tmp["extracttype"]) newTmp["pici"] = time.Now().Unix() saveEsAllPool <- newTmp }(tmp) tmp = make(map[string]interface{}) } log.Info("create biddingdata index over", zap.Any("mapInfo", mapInfo), zap.Int("count", n)) }