package main import ( "encoding/json" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "os" "sync" "time" ) // 支持流式结构的历史判重... func historyFlowRepeat() { defer qu.Catch() //刚启动根据起始id...查询到至今的...往前推1小时... for { if gtid == "" { log.Println("请传gtid,否则无法运行") os.Exit(0) return } if lteid != "" && !IsFull { //先进行数据迁移 log.Println("开启一次迁移任务", gtid, lteid) moveHistoryData(gtid, lteid) gtid = lteid //替换数据 } //查询表最后一个id... isRepeatStatus := false lteid = FindOneLteid() if lteid > gtid { isRepeatStatus = true } if !isRepeatStatus { log.Println("查询不到最新lteid数据...睡眠...") time.Sleep(30 * time.Second) continue } log.Println("查询找到最新的lteid...", gtid, lteid) if isUpdateSite { initSite() } sess := data_mgo.GetMgoConn() //连接器 defer data_mgo.DestoryMongoConn(sess) between_time := time.Now().Unix() - (86400 * timingPubScope) //周期 //开始判重 q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(gtid), "$lte": StringTOBsonId(lteid), }, } log.Println("历史判重查询条件:", q, "时间:", between_time) it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数 pendAllArr := [][]map[string]interface{}{} //待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } //取-符合-发布时间X年内的数据 if qu.IntAll(tmp["dataging"]) == 1 { pubtime := qu.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time && qu.ObjToString(tmp["subtype"]) != "拟建" && qu.ObjToString(tmp["subtype"]) != "产权" && qu.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" { oknum++ if deterTime == 0 { log.Println("找到第一条符合条件的数据") deterTime = qu.Int64All(tmp["publishtime"]) dayArr = append(dayArr, tmp) } else { if pubtime-deterTime > timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr, dayArr) dayArr = []map[string]interface{}{} deterTime = qu.Int64All(tmp["publishtime"]) dayArr = append(dayArr, tmp) } else { dayArr = append(dayArr, tmp) } } } else { outnum++ update := map[string]interface{}{ "dataging": 0, "history_updatetime": qu.Int64All(time.Now().Unix()), } //不在两年内的也清标记 Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, } //发送消息告知...需要进行更新操作... sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"])) } } tmp = make(map[string]interface{}) } if len(dayArr) > 0 { pendAllArr = append(pendAllArr, dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum := 0 for k, v := range pendAllArr { log.Println("第", k, "组--", "数量:", len(v)) testNum = testNum + len(v) } log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum) n, repeateN := 0, 0 log.Println("线程数:", threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for k, v := range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := qu.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) b, source, reason := curTM.check(info) if b { //有重复,更新 repeateN++ update := map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, "history_updatetime": qu.Int64All(time.Now().Unix()), } Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, } //发送消息告知...需要进行更新操作... sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"])) } else { update := map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 "history_updatetime": qu.Int64All(time.Now().Unix()), } Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, } //发送消息告知...需要进行更新操作... sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"])) } } }(k, v) } wg.Wait() log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid) time.Sleep(10 * time.Second) //发送upd支持第二阶段流程... if gtid != lteid { for _, to := range nextNode { next_sid := qu.BsonIdToSId(gtid) next_eid := qu.BsonIdToSId(lteid) key := next_sid + "-" + next_eid + "-" + qu.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": next_sid, "lteid": next_eid, "stype": qu.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: qu.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } log.Println("继续下一段的历史判重") } } // 查询最后一个id...倒推...100条? func FindOneLteid() string { task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q, total := map[string]interface{}{}, 0 it_last := task_sess.DB(task_mgo.DbName).C(task_bidding).Find(&q).Sort("-_id").Iter() for tmp := make(map[string]interface{}); it_last.Next(&tmp); total++ { if total >= 100 { lteid = qu.ObjToString(tmp["lteid"]) break } tmp = make(map[string]interface{}) } return lteid } // 发送消息...流结构... func sendFlowRepeatInfo(update map[string]interface{}, tmpid string) { msgInfo := MsgInfo{} msgInfo.Id = tmpid msgInfo.Data = update bs, err := json.Marshal(msgInfo) if err == nil { jn.PubReqZip("repeat_task", bs, time.Second) } else { log.Println("异常发送流数据...", tmpid) //需要保存记录 tmpid ... } }