1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- package main
- import (
- "esindex/config"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- )
- func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- stype := util.ObjToString(mapInfo["stype"])
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- idMap := q["_id"].(map[string]interface{})
- tmpQ := map[string]interface{}{}
- for c, id := range idMap {
- if idStr, ok := id.(string); ok && id != "" {
- tmpQ[c] = mongodb.StringTOBsonId(idStr)
- }
- }
- q["_id"] = tmpQ
- }
- //bidding库
- biddingConn := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(biddingConn)
- count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
- log.Info("biddingDataTask", zap.Any("查询语句", q), zap.Any("同步总数:", count))
- //查询招标数据
- query := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(bson.M{
- "projectname": 1,
- "title": 1,
- "site": 1,
- "href": 1,
- "publishtime": 1,
- "spidercode": 1,
- "extracttype": 1,
- "comeintime": 1,
- }).Sort("_id").Iter()
- n := 0
- //更新数组
- var mpool = make(chan bool, 30)
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if n%20000 == 0 {
- log.Info("current", zap.Int("count", n), zap.Any("current id", tmp["_id"]))
- }
- mpool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-mpool
- }()
- newTmp, _ := GetEsField(tmp, stype)
- newTmp["extracttype"] = util.IntAll(tmp["extracttype"])
- saveEsAllPool <- newTmp
- }(tmp)
- tmp = make(map[string]interface{})
- }
- log.Info("create biddingdata index over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
- }
|