123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package util
- import (
- mu "mfw/util"
- "mongodb"
- qu "qfw/util"
- "sync"
- )
- var (
- Config map[string]interface{}
- Mgo *mongodb.MongodbSim
- MgoB *mongodb.MongodbSim
- Coll string
- StartID string //bidding_processing_ids表ID
- //udp
- Udpclient mu.UdpClient //udp对象
- UdpPort string
- NextAddr string
- NextPort int
- NextStype string
- //mail
- UdptaskMap = &sync.Map{}
- Tomail string
- Api string
- //ocr
- OcrServerAddr string //ocr服务治理中心
- )
- func GetIdInterval(id string) (gtid, lteid string) {
- defer qu.Catch()
- qu.Debug("获取id段...")
- query := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(id),
- },
- "dataprocess": 8,
- }
- list, _ := MgoB.Find(Coll, query, map[string]interface{}{"_id": 1}, nil, false, 0, 1) //查找一个id区间
- if len(*list) > 0 {
- gtid = qu.ObjToString((*list)[0]["gtid"]) //起始id
- lteid = qu.ObjToString((*list)[0]["lteid"]) //结束id
- StartID = mongodb.BsonIdToSId((*list)[0]["_id"])
- qu.Debug("当前轮ID区间:", gtid, lteid, "表ID:", StartID)
- return
- }
- return "", ""
- }
- func UpdateBiddingData(gtid, lteid string) {
- defer qu.Catch()
- //查询数据
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- ch := make(chan bool, 10)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(gtid),
- "$lte": mongodb.StringTOBsonId(lteid),
- },
- }
- fields := map[string]interface{}{
- "file_add_log": 0,
- }
- it := sess.DB(MgoB.DbName).C("bidding_downloadfile_log").Find(&query).Select(&fields).Iter()
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- update := []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": tmp},
- }
- lock.Lock()
- arr = append(arr, update)
- if len(arr) > 100 {
- MgoB.UpdateBulk("bidding", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoB.UpdateBulk("bidding", arr...)
- arr = [][]map[string]interface{}{}
- }
- qu.Debug("更新数据完毕:", gtid, lteid)
- }
|