12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- util "utils"
- "utils/mongodb"
- )
- func (t *TaskInfo) biddingBackTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- //bidding库
- biddingConn := biddingMgo.GetMgoConn()
- defer biddingMgo.DestoryMongoConn(biddingConn)
- c := util.ObjToString(mapInfo["coll"])
- if c == "" {
- c = bidding["collect"].(string)
- } else {
- currentColl = c
- }
- count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
- util.Debug("查询语句:", q, "同步总数:", count)
- //查询招标数据
- query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- "publishdept": 0,
- }).Sort("_id").Iter()
- //查询抽取结果
- n := 0
- var mpool = make(chan bool, t.thread)
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if n%20000 == 0 {
- util.Debug("current:", n, tmp["_id"])
- }
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- continue
- }
- if util.IntAll(tmp["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- mpool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-mpool
- }()
- newTmp := GetEsField(tmp, nil, t.stype)
- saveEsElsePool <- newTmp
- }(tmp)
- tmp = make(map[string]interface{})
- }
- util.Debug(mapInfo, "create biddingback index...over", n)
- }
|