apple 795b26a030 备份 5 年之前
..
README.md 795b26a030 备份 5 年之前
config.json 795b26a030 备份 5 年之前
dataMethod.go d40d0f3d74 判重 合并 5 年之前
dataMethodHeavy.go d40d0f3d74 判重 合并 5 年之前
dataMethodMerge.go da2c483229 数据合并-相关功能 5 年之前
datamap.go 795b26a030 备份 5 年之前
main.go 795b26a030 备份 5 年之前
mgo.go f60e2722b1 优化 5 年之前
udptaskmap.go 083ce1e3e3 项目迁移 6 年之前

README.md

func moveTimeoutData() {

log.Println("部署迁移定时任务")
c := cron.New()
c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
c.Start()

}

func moveOnceTimeOut() {

log.Println("执行一次迁移超时数据")

sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
now:=time.Now()
move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix()
q := map[string]interface{}{
    "comeintime": map[string]interface{}{
        "$lt": move_time,
    },
}
log.Println(q)
it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
index := 0
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
    if index%10000 == 0 {
        log.Println("index", index)
    }
    del_id:=BsonTOStringId(tmp["_id"])
    mgo.Save("result_20200713", tmp)
    mgo.DeleteById("result_20200714",del_id)
    tmp = map[string]interface{}{}
}
log.Println("save and delete", " ok index", index)

}

func repairHistory() {

defer util.Catch()
log.Println("执行修复程序")
sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
q:=map[string]interface{}{}
between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
//开始判重
q = map[string]interface{}{
    "_id": map[string]interface{}{
        "$gt": StringTOBsonId("5f15bf800000000000000000"),
        "$lte": StringTOBsonId("5f2375b2a120e23754be1039"),
    },
}
log.Println("历史判重查询条件:",q,"时间:", between_time)

it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
pendAllArr:=[][]map[string]interface{}{}//待处理数组
dayArr := []map[string]interface{}{}
for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
    if num%10000 == 0 {
        log.Println("正序遍历:", num)
    }
    source := util.ObjToMap(tmp["jsondata"])
    if util.IntAll((*source)["sourcewebsite"]) == 1 {
        updateExtract = append(updateExtract, []map[string]interface{}{
            map[string]interface{}{
                "_id": tmp["_id"],
            },
            map[string]interface{}{
                "$set": map[string]interface{}{
                    "repeat": 1,
                    "dataging": 0,
                    "repeat_reason": "sourcewebsite为1 重复",
                },
            },
        })
        if len(updateExtract) > 50 {
            mgo.UpSertBulk(extract, updateExtract...)
            updateExtract = [][]map[string]interface{}{}
        }
        tmp = make(map[string]interface{})
        continue
    }

    //取-符合-发布时间X年内的数据
    if util.IntAll(tmp["dataging"]) == 1 {
        pubtime := util.Int64All(tmp["publishtime"])
        if pubtime > 0 && pubtime >= between_time {
            oknum++
            if deterTime==0 {
                log.Println("找到第一条符合条件的数据")
                deterTime = util.Int64All(tmp["publishtime"])
                dayArr = append(dayArr,tmp)
            }else {
                if pubtime-deterTime >timingSpanDay*86400 {
                    //新数组重新构建,当前组数据加到全部组数据
                    pendAllArr = append(pendAllArr,dayArr)
                    dayArr = []map[string]interface{}{}
                    deterTime = util.Int64All(tmp["publishtime"])
                    dayArr = append(dayArr,tmp)
                }else {
                    dayArr = append(dayArr,tmp)
                }
            }
        }else {
            //不在两年内的也清标记
            updateExtract = append(updateExtract, []map[string]interface{}{
                map[string]interface{}{
                    "_id": tmp["_id"],
                },
                map[string]interface{}{
                    "$set": map[string]interface{}{
                        "dataging": 0,
                    },
                },
            })
            if len(updateExtract) > 50 {
                mgo.UpSertBulk(extract, updateExtract...)
                updateExtract = [][]map[string]interface{}{}
            }

        }
    }
    tmp = make(map[string]interface{})
}


//批量更新标记
if len(updateExtract) > 0 {
    mgo.UpSertBulk(extract, updateExtract...)
    updateExtract = [][]map[string]interface{}{}
}

if len(dayArr)>0 {
    pendAllArr = append(pendAllArr,dayArr)
    dayArr = []map[string]interface{}{}
}

log.Println("查询数量:",num,"符合条件:",oknum)

if len(pendAllArr) <= 0 {
    log.Println("没找到dataging==1的数据")
}

//测试分组数量是否正确
testNum:=0
for k,v:=range pendAllArr {
    log.Println("第",k,"组--","数量:",len(v))
    testNum = testNum+len(v)
}
log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)

n, repeateN := 0, 0
pool := make(chan bool, 2)
wg := &sync.WaitGroup{}
for k,v:=range pendAllArr { //每组结束更新一波数据
    pool <- true
    wg.Add(1)
    go func(k int, v []map[string]interface{}) {
        defer func() {
            <-pool
            wg.Done()
        }()
        //每组临时数组 -  互不干扰
        groupUpdateExtract := [][]map[string]interface{}{}
        //构建当前组的数据池
        log.Println("构建第", k, "组---(数据池)")
        //当前组的第一个发布时间
        first_pt := util.Int64All(v[len(v)-1]["publishtime"])
        curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
        log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
        n = n + len(v)
        log.Println("统计目前总数量:", n, "重复数量:", repeateN)
        for _, tmp := range v {
            info := NewInfo(tmp)
            if !LowHeavy { //是否进行低质量数据判重
                if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
                    log.Println("无效数据")
                    groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                        map[string]interface{}{
                            "_id": tmp["_id"],
                        },
                        map[string]interface{}{
                            "$set": map[string]interface{}{
                                "repeat":   -1, //无效数据标签
                                "dataging": 0,
                            },
                        },
                    })
                    if len(groupUpdateExtract) > 50 {
                        mgo.UpSertBulk(extract, groupUpdateExtract...)
                        groupUpdateExtract = [][]map[string]interface{}{}
                    }
                    return
                }
            }
            b, source, reason := curTM.check(info)
            if b { //有重复,生成更新语句,更新抽取和更新招标
                repeateN++
                //重复数据打标签
                groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                    map[string]interface{}{
                        "_id": tmp["_id"],
                    },
                    map[string]interface{}{
                        "$set": map[string]interface{}{
                            "repeat":        1,
                            "repeat_reason": reason,
                            "repeat_id":     source.id,
                            "dataging":      0,
                        },
                    },
                })
            } else {
                groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                    map[string]interface{}{
                        "_id": tmp["_id"],
                    },
                    map[string]interface{}{
                        "$set": map[string]interface{}{
                            "dataging": 0, //符合条件的都为dataging==0
                        },
                    },
                })
            }
            if len(groupUpdateExtract) > 50 {
                mgo.UpSertBulk(extract, groupUpdateExtract...)
                groupUpdateExtract = [][]map[string]interface{}{}
            }
        }
        //每组数据结束-更新数据
        if len(groupUpdateExtract) > 0 {
            mgo.UpSertBulk(extract, groupUpdateExtract...)
        }
    }(k, v)

}

wg.Wait()


time.Sleep(30 * time.Second)
log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
log.Println("修复结束")

}