package main import ( "log" "qfw/common/src/qfw/util" qu "qfw/util" "sync" "time" ) //开始全量判重程序 func fullRepeat(sid,eid string) { defer qu.Catch() //区间id-是否分段 if IsFull && sec_gtid!="" && sec_lteid!=""{ sid = sec_gtid eid = sec_lteid } q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, } log.Println("开始全量数据判重~查询条件:",data_mgo.DbName, extract, q) sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() n, isok ,repeatN:= 0,0,0 dataAllDict := make(map[string][]map[string]interface{},0) for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if n%1000 == 0 { log.Println("index: ", n, isok) } if util.IntAll(tmp["repeat"]) == 1 { repeatN++ tmp = make(map[string]interface{}) continue } if util.IntAll(tmp["dataging"]) == 1 && !IsFull{ tmp = make(map[string]interface{}) continue } //优化空间-相同天-划分一组(在分类别) isok++ //数据分组-按照类别分组 subtype := qu.ObjToString(tmp["subtype"]) if subtype=="招标"||subtype=="邀标"||subtype=="询价"|| subtype=="竞谈"||subtype=="竞价" { subtype = "招标" } dataArr := dataAllDict[subtype] if dataArr==nil { dataArr = []map[string]interface{}{} } dataArr = append(dataArr,tmp) dataAllDict[subtype] = dataArr tmp = make(map[string]interface{}) } log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条") pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for _,dataArr := range dataAllDict { pool <- true wg.Add(1) go func(dataArr []map[string]interface{}) { defer func() { <-pool wg.Done() }() num := 0 for _,tmp := range dataArr{ info := NewInfo(tmp) b, source, _ := DM.check(info) if b { num++ var updateID = map[string]interface{}{} //记录更新判重的 updateID["_id"] = StringTOBsonId(info.id) repeat_ids:=source.repeat_ids repeat_ids = append(repeat_ids,info.id) source.repeat_ids = repeat_ids DM.replacePoolData(source)//替换数据池-更新 //Update.updatePool <- []map[string]interface{}{//重复数据打标签 // updateID, // map[string]interface{}{ // "$set": map[string]interface{}{ // "repeat": 1, // "repeat_reason": reason, // "repeat_id": source.id, // "dataging": 0, // "updatetime_repeat" :util.Int64All(time.Now().Unix()), // }, // }, //} } } numberlock.Lock() repeatN+=num numberlock.Unlock() }(dataArr) } wg.Wait() log.Println("this full data is over.", n, "repeateN:", repeatN) time.Sleep(15 * time.Second) }