package main import ( "fmt" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "sync" "time" ) var timeLayout = "2006-01-02" //var timeLayout = "2006-01-02 15:04:05" // 划分时间段落 func initModelArr() []map[string]interface{} { modelArr := make([]map[string]interface{}, 0) start := time.Date(2021, 12, 15, 0, 0, 0, 0, time.Local).Unix() end := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local).Unix() gte_time := start lt_time := start + 86400 log.Println("开始构建数据池...一周...") FullDM = TimedTaskDatamap(dupdays, start, 1) log.Println("......") log.Println("开启...全量判重...", start, "~", end) for { modelArr = append(modelArr, map[string]interface{}{ "publishtime": map[string]interface{}{ "$gte": gte_time, "$lt": lt_time, }, }) gte_time = lt_time lt_time = gte_time + 86400 if lt_time > end { break } } return modelArr } // 全量数据处理 func fullDataRepeat() { modelArr := initModelArr() for _, query := range modelArr { pt := *qu.ObjToMap(query["publishtime"]) time_str := time.Unix(qu.Int64All(pt["$gte"]), 0).Format(timeLayout) dealWithfullData(query, time_str) } } // 多线程~处理数据 func dealWithfullData(query map[string]interface{}, time_str string) { log.Println("开始处理~", time_str, "~", query) sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) it := sess.DB(data_mgo.DbName).C(extract).Find(&query).Sort("publishtime").Iter() total, isok, repeatN := 0, 0, 0 dataAllDict := make(map[string][]map[string]interface{}, 0) for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if qu.IntAll(tmp["repeat"]) == 1 || qu.IntAll(tmp["repeat"]) == -1 || qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权" || qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" { tmp = make(map[string]interface{}) continue } isok++ subtype := qu.ObjToString(tmp["subtype"]) if subtype == "招标" || subtype == "邀标" || subtype == "询价" || subtype == "竞谈" || subtype == "竞价" { subtype = "招标" } dataArr := dataAllDict[subtype] if dataArr == nil { dataArr = []map[string]interface{}{} } dataArr = append(dataArr, tmp) dataAllDict[subtype] = dataArr tmp = make(map[string]interface{}) } pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for _, dataArr := range dataAllDict { fmt.Print("...") pool <- true wg.Add(1) go func(dataArr []map[string]interface{}) { defer func() { <-pool wg.Done() }() num := 0 for _, tmp := range dataArr { info := NewInfo(tmp) b, source, reason := FullDM.check(info) if b { num++ AddGroupPool.pool <- map[string]interface{}{ "_id": StringTOBsonId(info.id), "repeat_id": source.id, "reason": reason, "update_time": qu.Int64All(time.Now().Unix()), } } } numlock.Lock() repeatN += num numlock.Unlock() }(dataArr) } wg.Wait() log.Println("处理结束~", time_str, "总计需判重~", isok, "~重复量", repeatN) }