123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- package main
- import (
- "encoding/json"
- "github.com/cron"
- "gopkg.in/mgo.v2/bson"
- "log"
- mu "mfw/util"
- "net"
- "os"
- qutil "qfw/util"
- "strconv"
- "sync"
- "time"
- )
- func getHistoryInfoId(keystatus string) bool {
- //查询表最后一个id
- qfw_sess := qfw_mgo.GetMgoConn()
- defer qfw_mgo.DestoryMongoConn(qfw_sess)
- q := map[string]interface{}{}
- it_last := qfw_sess.DB(qfw_mgo.DbName).C(qfw_coll).Find(&q).Sort("-_id").Iter()
- isRepeatStatus := false
- for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
- is_repeat_status := qutil.IntAll(tmp[keystatus])
- if is_repeat_status == 1 {
- lteid = qutil.ObjToString(tmp["lteid"])
- log.Println("查询的最后一个已标记的任务lteid:", lteid)
- isRepeatStatus = true
- tmp = make(map[string]interface{})
- break
- } else {
- tmp = make(map[string]interface{})
- }
- }
- return isRepeatStatus
- }
- // 历史判重
- func historyRepeat() {
- defer qutil.Catch()
- for {
- start := time.Now().Unix()
- if gtid == "" {
- log.Println("请传gtid,否则无法运行")
- os.Exit(0)
- return
- }
- if lteid != "" && !IsFull { //先进行数据迁移
- log.Println("开启一次迁移任务", gtid, lteid)
- moveHistoryData(gtid, lteid)
- gtid = lteid //替换数据
- }
- //查询历史数据id
- isRepeatStatus := false
- if !update_ai {
- isRepeatStatus = getHistoryInfoId("repeat_status")
- } else {
- isRepeatStatus = getHistoryInfoId("repeat_status_ai")
- }
- if !isRepeatStatus {
- log.Println("查询不到有标记的lteid数据......睡眠......")
- time.Sleep(30 * time.Second)
- continue
- }
- log.Println("查询找到有标记的lteid......睡眠......", gtid, lteid)
- if isUpdateSite {
- initSite()
- }
- time.Sleep(30 * time.Second)
- sess := data_mgo.GetMgoConn() //连接器
- defer data_mgo.DestoryMongoConn(sess)
- between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
- ////临时-补齐差额
- //log.Println("临时···66c58769b25c3e1deb79107c,66f617bdb25c3e1deb3ee999")
- //gtid = "66c58769b25c3e1deb79107c"
- //lteid = "66f617bdb25c3e1deb3ee999"
- //开始判重
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(gtid),
- "$lte": StringTOBsonId(lteid),
- },
- }
- log.Println("历史判重查询条件:", q, "时间:", between_time)
- it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
- pendAllArr := [][]map[string]interface{}{} //待处理数组
- dayArr := []map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
- if num%10000 == 0 {
- log.Println("正序遍历:", num, "~", oknum)
- }
- delete(tmp, "field_source")
- delete(tmp, "regions_log")
- delete(tmp, "kvtext")
- //取-符合-发布时间X年内的数据
- if qutil.IntAll(tmp["dataging"]) == 1 {
- pubtime := qutil.Int64All(tmp["publishtime"])
- if pubtime > 0 && pubtime >= between_time && qutil.ObjToString(tmp["subtype"]) != "拟建" && qutil.ObjToString(tmp["subtype"]) != "产权" &&
- qutil.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
- oknum++
- if deterTime == 0 {
- log.Println("找到第一条符合条件的数据")
- deterTime = qutil.Int64All(tmp["publishtime"])
- dayArr = append(dayArr, tmp)
- } else {
- if pubtime-deterTime > timingSpanDay*86400 {
- //新数组重新构建,当前组数据加到全部组数据
- pendAllArr = append(pendAllArr, dayArr)
- dayArr = []map[string]interface{}{}
- deterTime = qutil.Int64All(tmp["publishtime"])
- dayArr = append(dayArr, tmp)
- } else {
- dayArr = append(dayArr, tmp)
- }
- }
- } else {
- outnum++
- //不在两年内的也清标记
- Update.updatePool <- []map[string]interface{}{ //重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "dataging": 0,
- "history_updatetime": qutil.Int64All(time.Now().Unix()),
- },
- },
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- if len(dayArr) > 0 {
- pendAllArr = append(pendAllArr, dayArr)
- dayArr = []map[string]interface{}{}
- }
- log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
- if len(pendAllArr) <= 0 {
- log.Println("没找到dataging==1的数据")
- }
- //测试分组数量是否正确
- testNum := 0
- for k, v := range pendAllArr {
- log.Println("第", k, "组--", "数量:", len(v))
- testNum = testNum + len(v)
- }
- log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
- n, repeateN := 0, 0
- log.Println("线程数:", threadNum)
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- for k, v := range pendAllArr { //每组结束更新一波数据
- pool <- true
- wg.Add(1)
- go func(k int, v []map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- log.Println("构建第", k, "组---(数据池)")
- //当前组的第一个发布时间
- first_pt := qutil.Int64All(v[len(v)-1]["publishtime"])
- curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
- log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
- n = n + len(v)
- log.Println("统计目前总数量:", n, "重复数量:", repeateN)
- for _, tmp := range v {
- info := NewInfo(tmp)
- b, source, reason := curTM.check(info)
- if b { //有重复,更新
- repeateN++
- Update.updatePool <- []map[string]interface{}{ //重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": source.id,
- "dataging": 0,
- "history_updatetime": qutil.Int64All(time.Now().Unix()),
- },
- },
- }
- } else {
- Update.updatePool <- []map[string]interface{}{ //重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "dataging": 0, //符合条件的都为dataging==0
- "history_updatetime": qutil.Int64All(time.Now().Unix()),
- },
- },
- }
- }
- }
- }(k, v)
- }
- wg.Wait()
- log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
- time.Sleep(30 * time.Second)
- //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
- if gtid != lteid {
- for _, to := range nextNode {
- next_sid := qutil.BsonIdToSId(gtid)
- next_eid := qutil.BsonIdToSId(lteid)
- key := next_sid + "-" + next_eid + "-" + qutil.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": next_sid,
- "lteid": next_eid,
- "stype": qutil.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: qutil.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- end := time.Now().Unix()
- log.Println(gtid, lteid)
- if end-start < 60*5 {
- log.Println("睡眠.............")
- time.Sleep(5 * time.Minute)
- }
- log.Println("继续下一段的历史判重")
- }
- }
- // 判断是否在当前id段落
- func judgeIsCurIds(gtid string, lteid string, curid string) bool {
- gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
- lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
- cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
- if cur_time >= gt_time && cur_time <= lte_time {
- return true
- }
- return false
- }
- // 迁移上一段数据
- func moveHistoryData(startid string, endid string) {
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- year, month, day := time.Now().Date()
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(startid),
- "$lte": StringTOBsonId(endid),
- },
- }
- log.Println(q)
- it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter()
- index := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- data_mgo.Save(extract_back, tmp)
- tmp = map[string]interface{}{}
- if index%1000 == 0 {
- log.Println("index", index)
- }
- }
- log.Println("save to", extract_back, " ok index", index)
- qv := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour * 2).Unix(),
- },
- }
- delnum := data_mgo.Delete(extract, qv)
- log.Println("remove from ", extract, delnum)
- }
- // 暂时弃用
- func moveTimeoutData() {
- log.Println("部署迁移定时任务")
- c := cron.New()
- c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
- c.Start()
- }
- func moveOnceTimeOut() {
- log.Println("执行一次迁移超时数据")
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- now := time.Now()
- move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
- task_id := qutil.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$lt": StringTOBsonId(task_id),
- },
- }
- it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter()
- index := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- if index%10000 == 0 {
- log.Println("index", index)
- }
- del_id := BsonTOStringId(tmp["_id"])
- data_mgo.Save("result_20200713", tmp)
- data_mgo.DeleteById("result_20200714", del_id)
- tmp = map[string]interface{}{}
- }
- log.Println("save and delete", " ok index", index)
- }
|