package util import ( mu "mfw/util" "mongodb" qu "qfw/util" "sync" ) var ( Config map[string]interface{} Mgo *mongodb.MongodbSim MgoB *mongodb.MongodbSim Coll string StartID string //bidding_processing_ids表ID //udp Udpclient mu.UdpClient //udp对象 UdpPort string NextAddr string NextPort int NextStype string //mail UdptaskMap = &sync.Map{} Tomail string Api string //ocr OcrServerAddr string //ocr服务治理中心 ) func GetIdInterval(id string) (gtid, lteid string) { defer qu.Catch() qu.Debug("获取id段...") query := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(id), }, "dataprocess": 8, } list, _ := MgoB.Find(Coll, query, map[string]interface{}{"_id": 1}, nil, false, 0, 1) //查找一个id区间 if len(*list) > 0 { gtid = qu.ObjToString((*list)[0]["gtid"]) //起始id lteid = qu.ObjToString((*list)[0]["lteid"]) //结束id StartID = mongodb.BsonIdToSId((*list)[0]["_id"]) qu.Debug("当前轮ID区间:", gtid, lteid, "表ID:", StartID) return } return "", "" } func UpdateBiddingData(gtid, lteid string) { defer qu.Catch() //查询数据 sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } fields := map[string]interface{}{ "file_add_log": 0, } it := sess.DB(MgoB.DbName).C("bidding_downloadfile_log").Find(&query).Select(&fields).Iter() n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() update := []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": tmp}, } lock.Lock() arr = append(arr, update) if len(arr) > 100 { MgoB.UpdateBulk("bidding", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoB.UpdateBulk("bidding", arr...) arr = [][]map[string]interface{}{} } qu.Debug("更新数据完毕:", gtid, lteid) }