123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package main
- import (
- "log"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "sync"
- "time"
- )
- //开始判重程序
- func fullRepeat(sid,eid string) {
- defer qu.Catch()
- //区间id-是否分段
- if IsFull && sec_gtid!="" && sec_lteid!=""{
- sid = sec_gtid
- eid = sec_lteid
- }
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(sid),
- "$lte": StringTOBsonId(eid),
- },
- }
- log.Println("开始全量数据判重~查询条件:",mgo.DbName, extract, q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- n, isok ,repeatN:= 0,0,0
- dataAllDict := make(map[string][]map[string]interface{},0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- if n%1000 == 0 {
- log.Println("index: ", n, isok)
- }
- if util.IntAll(tmp["repeat"]) == 1 {
- repeatN++
- tmp = make(map[string]interface{})
- continue
- }
- if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
- tmp = make(map[string]interface{})
- continue
- }
- //优化空间-相同天-划分一组(在分类别)
- isok++
- //数据分组-按照类别分组
- subtype := qu.ObjToString(tmp["subtype"])
- if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
- subtype=="竞谈"||subtype=="竞价" {
- subtype = "招标"
- }
- dataArr := dataAllDict[subtype]
- if dataArr==nil {
- dataArr = []map[string]interface{}{}
- }
- dataArr = append(dataArr,tmp)
- dataAllDict[subtype] = dataArr
- tmp = make(map[string]interface{})
- }
- log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- for _,dataArr := range dataAllDict {
- pool <- true
- wg.Add(1)
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- num := 0
- for _,tmp := range dataArr{
- info := NewInfo(tmp)
- b, source, _ := DM.check(info)
- if b {
- num++
- var updateID = map[string]interface{}{} //记录更新判重的
- updateID["_id"] = StringTOBsonId(info.id)
- repeat_ids:=source.repeat_ids
- repeat_ids = append(repeat_ids,info.id)
- source.repeat_ids = repeat_ids
- DM.replacePoolData(source)//替换数据池-更新
- //Update.updatePool <- []map[string]interface{}{//重复数据打标签
- // updateID,
- // map[string]interface{}{
- // "$set": map[string]interface{}{
- // "repeat": 1,
- // "repeat_reason": reason,
- // "repeat_id": source.id,
- // "dataging": 0,
- // "updatetime_repeat" :util.Int64All(time.Now().Unix()),
- // },
- // },
- //}
- }
- }
- updatelock.Lock()
- repeatN+=num
- updatelock.Unlock()
- }(dataArr)
- }
- wg.Wait()
- log.Println("this full data is over.", n, "repeateN:", repeatN)
- time.Sleep(15 * time.Second)
- }
|