123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package main
- import (
- "fmt"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "sync"
- "time"
- )
- var timeLayout = "2006-01-02"
- //var timeLayout = "2006-01-02 15:04:05"
- // 划分时间段落
- func initModelArr() []map[string]interface{} {
- modelArr := make([]map[string]interface{}, 0)
- start := time.Date(2021, 12, 15, 0, 0, 0, 0, time.Local).Unix()
- end := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local).Unix()
- gte_time := start
- lt_time := start + 86400
- log.Println("开始构建数据池...一周...")
- FullDM = TimedTaskDatamap(dupdays, start, 1)
- log.Println("......")
- log.Println("开启...全量判重...", start, "~", end)
- for {
- modelArr = append(modelArr, map[string]interface{}{
- "publishtime": map[string]interface{}{
- "$gte": gte_time,
- "$lt": lt_time,
- },
- })
- gte_time = lt_time
- lt_time = gte_time + 86400
- if lt_time > end {
- break
- }
- }
- return modelArr
- }
- // 全量数据处理
- func fullDataRepeat() {
- modelArr := initModelArr()
- for _, query := range modelArr {
- pt := *qu.ObjToMap(query["publishtime"])
- time_str := time.Unix(qu.Int64All(pt["$gte"]), 0).Format(timeLayout)
- dealWithfullData(query, time_str)
- }
- }
- // 多线程~处理数据
- func dealWithfullData(query map[string]interface{}, time_str string) {
- log.Println("开始处理~", time_str, "~", query)
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- it := sess.DB(data_mgo.DbName).C(extract).Find(&query).Sort("publishtime").Iter()
- total, isok, repeatN := 0, 0, 0
- dataAllDict := make(map[string][]map[string]interface{}, 0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if qu.IntAll(tmp["repeat"]) == 1 || qu.IntAll(tmp["repeat"]) == -1 ||
- qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权" ||
- qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" {
- tmp = make(map[string]interface{})
- continue
- }
- isok++
- subtype := qu.ObjToString(tmp["subtype"])
- if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
- subtype == "竞谈" || subtype == "竞价" {
- subtype = "招标"
- }
- dataArr := dataAllDict[subtype]
- if dataArr == nil {
- dataArr = []map[string]interface{}{}
- }
- dataArr = append(dataArr, tmp)
- dataAllDict[subtype] = dataArr
- tmp = make(map[string]interface{})
- }
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- for _, dataArr := range dataAllDict {
- fmt.Print("...")
- pool <- true
- wg.Add(1)
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- num := 0
- for _, tmp := range dataArr {
- info := NewInfo(tmp)
- b, source, reason := FullDM.check(info)
- if b {
- num++
- AddGroupPool.pool <- map[string]interface{}{
- "_id": StringTOBsonId(info.id),
- "repeat_id": source.id,
- "reason": reason,
- "update_time": qu.Int64All(time.Now().Unix()),
- }
- }
- }
- numlock.Lock()
- repeatN += num
- numlock.Unlock()
- }(dataArr)
- }
- wg.Wait()
- log.Println("处理结束~", time_str, "总计需判重~", isok, "~重复量", repeatN)
- }
|