12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- util "utils"
- "utils/mongodb"
- )
- func (t *TaskInfo) biddingDataTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- idMap := q["_id"].(map[string]interface{})
- tmpQ := map[string]interface{}{}
- for c, id := range idMap {
- if idStr, ok := id.(string); ok && id != "" {
- tmpQ[c] = mongodb.StringTOBsonId(idStr)
- }
- }
- q["_id"] = tmpQ
- }
- //bidding库
- biddingConn := biddingMgo.GetMgoConn()
- defer biddingMgo.DestoryMongoConn(biddingConn)
- //连接信息
- c, _ := mapInfo["coll"].(string)
- if c == "" {
- c, _ = bidding["collect"].(string)
- } else {
- currentColl = c
- }
- count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
- util.Debug("查询语句:", q, "同步总数:", count, "elastic库:")
- //查询招标数据
- query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{
- "projectname": 1,
- "title": 1,
- "site": 1,
- "href": 1,
- "publishtime": 1,
- "spidercode": 1,
- "extracttype": 1,
- }).Sort("_id").Iter()
- n := 0
- //更新数组
- var mpool = make(chan bool, t.thread)
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if n%20000 == 0 {
- util.Debug("current:", n, tmp["_id"])
- }
- mpool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-mpool
- }()
- newTmp := GetEsField(tmp, nil, t.stype)
- newTmp["extracttype"] = util.IntAll(tmp["extracttype"])
- saveEsAllPool <- newTmp
- }(tmp)
- tmp = make(map[string]interface{})
- }
- util.Debug(mapInfo, "create biddingdata index...over", n)
- }
|