package main import ( "encoding/json" "github.com/cron" "gopkg.in/mgo.v2/bson" "log" mu "mfw/util" "net" "os" qu "qfw/common/src/qfw/util" "qfw/util" "strconv" "sync" "time" ) //历史判重 func historyRepeat() { defer util.Catch() for { start := time.Now().Unix() if gtid == "" { log.Println("请传gtid,否则无法运行") os.Exit(0) return } if lteid != "" && !IsFull { //先进行数据迁移 log.Println("开启一次迁移任务", gtid, lteid) moveHistoryData(gtid, lteid) gtid = lteid //替换数据 } //查询表最后一个id task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q := map[string]interface{}{} it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q).Sort("-_id").Iter() isRepeatStatus := false for tmp := make(map[string]interface{}); it_last.Next(&tmp); { is_repeat_status := util.IntAll(tmp["repeat_status"]) if is_repeat_status == 1 { lteid = util.ObjToString(tmp["lteid"]) log.Println("查询的最后一个已标记的任务lteid:", lteid) isRepeatStatus = true tmp = make(map[string]interface{}) break } else { tmp = make(map[string]interface{}) } } if !isRepeatStatus { log.Println("查询不到有标记的lteid数据......睡眠......") time.Sleep(30 * time.Second) continue } log.Println("查询找到有标记的lteid......睡眠......", gtid, lteid) if isUpdateSite { initSite() } time.Sleep(30 * time.Second) sess := data_mgo.GetMgoConn() //连接器 defer data_mgo.DestoryMongoConn(sess) between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期 //开始判重 q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(gtid), "$lte": StringTOBsonId(lteid), }, } log.Println("历史判重查询条件:", q, "时间:", between_time) it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数 pendAllArr := [][]map[string]interface{}{} //待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } //取-符合-发布时间X年内的数据 if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time && qu.ObjToString(tmp["subtype"]) != "拟建" && qu.ObjToString(tmp["subtype"]) != "产权" && qu.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" { oknum++ if deterTime == 0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr, tmp) } else { if pubtime-deterTime > timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr, dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr, tmp) } else { dayArr = append(dayArr, tmp) } } } else { outnum++ //不在两年内的也清标记 Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, "history_updatetime": util.Int64All(time.Now().Unix()), }, }, } } } tmp = make(map[string]interface{}) } if len(dayArr) > 0 { pendAllArr = append(pendAllArr, dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum := 0 for k, v := range pendAllArr { log.Println("第", k, "组--", "数量:", len(v)) testNum = testNum + len(v) } log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum) n, repeateN := 0, 0 log.Println("线程数:", threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for k, v := range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := util.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) b, source, reason := curTM.check(info) if b { //有重复,更新 repeateN++ Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, "history_updatetime": util.Int64All(time.Now().Unix()), }, }, } //关闭数据替换功能 //if judgeIsReplaceInfo(source.href, info.href) && !IsFull { // datalock.Lock() // temp_source_id := source.id // temp_info_id := info.id // temp_source := info // temp_source.id = temp_source_id // curTM.replacePoolData(temp_source) // //替换抽取表数据 // is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id) // is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id) // // if is_log && is_bid { // data_mgo.Save(extract_log, map[string]interface{}{ // "_id": StringTOBsonId(temp_info_id), // "replace_id": temp_source_id, // "is_history": 1, // }) // ext_s_data["repeat"] = 0 // ext_s_data["dataging"] = 0 // ext_i_data["repeat"] = 1 // ext_i_data["repeat_id"] = temp_source_id // ext_i_data["repeat_reason"] = reason // ext_i_data["dataging"] = 0 // ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix()) // if is_exists { // data_mgo.DeleteById(extract, temp_source_id) // data_mgo.Save(extract, ext_s_data) // } else { // data_mgo.DeleteById(extract_back, temp_source_id) // data_mgo.Save(extract_back, ext_s_data) // is_del := data_mgo.DeleteById(extract, temp_source_id) // if is_del > 0 { // data_mgo.Save(extract, ext_s_data) // } // } // data_mgo.DeleteById(extract, temp_info_id) // data_mgo.Save(extract, ext_i_data) // // task_mgo.DeleteById(task_bidding, temp_source_id) // task_mgo.Save(task_bidding, bid_s_data) // task_mgo.DeleteById(task_bidding, temp_info_id) // task_mgo.Save(task_bidding, bid_i_data) // // //通道填充数据 // msg := "id=" + temp_source_id // _ = nspdata_1.Publish(msg) // _ = nspdata_2.Publish(msg) // // } else { // log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id) // } // // datalock.Unlock() //} else { // Update.updatePool <- []map[string]interface{}{ //重复数据打标签 // map[string]interface{}{ // "_id": tmp["_id"], // }, // map[string]interface{}{ // "$set": map[string]interface{}{ // "repeat": 1, // "repeat_reason": reason, // "repeat_id": source.id, // "dataging": 0, // "history_updatetime": util.Int64All(time.Now().Unix()), // }, // }, // } //} } else { Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 "history_updatetime": util.Int64All(time.Now().Unix()), }, }, } } } }(k, v) } wg.Wait() log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid) time.Sleep(30 * time.Second) //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并 if gtid != lteid { for _, to := range nextNode { next_sid := util.BsonIdToSId(gtid) next_eid := util.BsonIdToSId(lteid) key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": next_sid, "lteid": next_eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } end := time.Now().Unix() log.Println(gtid, lteid) if end-start < 60*5 { log.Println("睡眠.............") time.Sleep(5 * time.Minute) } log.Println("继续下一段的历史判重") } } //判断是否在当前id段落 func judgeIsCurIds(gtid string, lteid string, curid string) bool { gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64) lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64) cur_time, _ := strconv.ParseInt(curid[:8], 16, 64) if cur_time >= gt_time && cur_time <= lte_time { return true } return false } //迁移上一段数据 func moveHistoryData(startid string, endid string) { sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) year, month, day := time.Now().Date() q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(startid), "$lte": StringTOBsonId(endid), }, } log.Println(q) it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { data_mgo.Save(extract_back, tmp) tmp = map[string]interface{}{} if index%1000 == 0 { log.Println("index", index) } } log.Println("save to", extract_back, " ok index", index) qv := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour * 2).Unix(), }, } delnum := data_mgo.Delete(extract, qv) log.Println("remove from ", extract, delnum) } //暂时弃用 func moveTimeoutData() { log.Println("部署迁移定时任务") c := cron.New() c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() }) c.Start() } func moveOnceTimeOut() { log.Println("执行一次迁移超时数据") sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) now := time.Now() move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local) task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time)) q := map[string]interface{}{ "_id": map[string]interface{}{ "$lt": StringTOBsonId(task_id), }, } it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000 == 0 { log.Println("index", index) } del_id := BsonTOStringId(tmp["_id"]) data_mgo.Save("result_20200713", tmp) data_mgo.DeleteById("result_20200714", del_id) tmp = map[string]interface{}{} } log.Println("save and delete", " ok index", index) }