mgo = &MongodbSim{ MongodbAddr: "172.17.4.187:27083", DbName: "qfw", Size: 10, } mgo.InitPool() return func moveTimeoutData() { log.Println("部署迁移定时任务") c := cron.New() c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() }) c.Start() } func moveOnceTimeOut() { log.Println("执行一次迁移超时数据") sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) now:=time.Now() move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix() q := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": move_time, }, } log.Println(q) it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000 == 0 { log.Println("index", index) } del_id:=BsonTOStringId(tmp["_id"]) mgo.Save("result_20200713", tmp) mgo.DeleteById("result_20200714",del_id) tmp = map[string]interface{}{} } log.Println("save and delete", " ok index", index) } func repairHistory() { defer util.Catch() log.Println("执行修复程序") sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q:=map[string]interface{}{} between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期 //开始判重 q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId("5f15bf800000000000000000"), "$lte": StringTOBsonId("5f2375b2a120e23754be1039"), }, } log.Println("历史判重查询条件:",q,"时间:", between_time) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数 updateExtract := [][]map[string]interface{}{}//批量更新mongo数组 pendAllArr:=[][]map[string]interface{}{}//待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } source := util.ObjToMap(tmp["jsondata"]) if util.IntAll((*source)["sourcewebsite"]) == 1 { updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "dataging": 0, "repeat_reason": "sourcewebsite为1 重复", }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) continue } //取-符合-发布时间X年内的数据 if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time { oknum++ if deterTime==0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { if pubtime-deterTime >timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { dayArr = append(dayArr,tmp) } } }else { //不在两年内的也清标记 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } } tmp = make(map[string]interface{}) } //批量更新标记 if len(updateExtract) > 0 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } if len(dayArr)>0 { pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:",num,"符合条件:",oknum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum:=0 for k,v:=range pendAllArr { log.Println("第",k,"组--","数量:",len(v)) testNum = testNum+len(v) } log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum) n, repeateN := 0, 0 pool := make(chan bool, 2) wg := &sync.WaitGroup{} for k,v:=range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() //每组临时数组 - 互不干扰 groupUpdateExtract := [][]map[string]interface{}{} //构建当前组的数据池 log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := util.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) if !LowHeavy { //是否进行低质量数据判重 if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) { log.Println("无效数据") groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, //无效数据标签 "dataging": 0, }, }, }) if len(groupUpdateExtract) > 50 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } return } } b, source, reason := curTM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ //重复数据打标签 groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, }, }, }) } else { groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 }, }, }) } if len(groupUpdateExtract) > 50 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } } //每组数据结束-更新数据 if len(groupUpdateExtract) > 0 { mgo.UpSertBulk(extract, groupUpdateExtract...) } }(k, v) } wg.Wait() time.Sleep(30 * time.Second) log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid) log.Println("修复结束") }