zhengkun b15bb7c2bd 备份-判重src4 пре 4 година
..
README.md b15bb7c2bd 备份-判重src4 пре 4 година
config.json b15bb7c2bd 备份-判重src4 пре 4 година
dataMethod.go b15bb7c2bd 备份-判重src4 пре 4 година
dataMethodHeavy.go b15bb7c2bd 备份-判重src4 пре 4 година
dataMethodMerge.go b15bb7c2bd 备份-判重src4 пре 4 година
datamap.go b15bb7c2bd 备份-判重src4 пре 4 година
main.go b15bb7c2bd 备份-判重src4 пре 4 година
mgo.go b15bb7c2bd 备份-判重src4 пре 4 година
udptaskmap.go b15bb7c2bd 备份-判重src4 пре 4 година
updateMethod.go b15bb7c2bd 备份-判重src4 пре 4 година

README.md

{

"udpport": ":19097",
"dupdays": 7,
"mongodb": {
    "addr": "172.17.4.85:27080",
    "pool": 10,
    "db": "qfw",
    "extract": "result_20200715",
    "extract_back": "result_20200714",
    "site": {
        "dbname": "qfw",
        "coll": "site"
    }
},
"task_mongodb": {
    "task_addrName": "172.17.4.187:27081",
    "task_dbName": "qfw",
    "task_collName": "ocr_flie_over",
    "pool": 5

},
"jkmail": {
    "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
    "api": "http://10.171.112.160:19281/_send/_mail"
},
"nextNode": [
    {
        "addr": "172.17.4.194",
        "port": 1782,
        "stype": "project",
        "memo": "合并项目"
    },
    {
        "addr": "127.0.0.1",
        "port": 1783,
        "stype": "bidding",
        "memo": "创建招标数据索引new"
    }
],
"threads": 1,
"isMerger": false,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 5,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"

}

mgo = &MongodbSim{

    MongodbAddr: "172.17.4.187:27083",
    DbName:      "qfw",
    Size:        10,
}

mgo.InitPool()

return

func moveTimeoutData() {

log.Println("部署迁移定时任务")
c := cron.New()
c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
c.Start()

}

func moveOnceTimeOut() {

log.Println("执行一次迁移超时数据")

sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
now:=time.Now()
move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix()
q := map[string]interface{}{
    "comeintime": map[string]interface{}{
        "$lt": move_time,
    },
}
log.Println(q)
it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
index := 0
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
    if index%10000 == 0 {
        log.Println("index", index)
    }
    del_id:=BsonTOStringId(tmp["_id"])
    mgo.Save("result_20200713", tmp)
    mgo.DeleteById("result_20200714",del_id)
    tmp = map[string]interface{}{}
}
log.Println("save and delete", " ok index", index)

}

{

"udpport": ":1785",
"dupdays": 7,
"mongodb": {
    "addr": "172.17.4.85:27080",
    "pool": 10,
    "db": "qfw",
    "extract": "result_20200715",
    "extract_back": "result_20200714",
    "site": {
        "dbname": "qfw",
        "coll": "site"
    }
},
"jkmail": {
    "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
    "api": "http://172.17.145.179:19281/_send/_mail"
},
"nextNode": [
    {
        "addr": "127.0.0.1",
        "port": 1783,
        "stype": "bidding",
        "memo": "创建招标数据索引new"
    }
],
"threads": 1,
"isMerger": false,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 5,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"

}

func historyTaskDay() {

defer util.Catch()

for {
    start:=time.Now().Unix()

    if gtid=="" {
        log.Println("请传gtid,否则无法运行")
        os.Exit(0)
        return
    }
    if lteid!="" {
        //先进行数据迁移
        log.Println("开启一次迁移任务",gtid,lteid)
        moveHistoryData(gtid,lteid)
        gtid = lteid //替换数据
    }

    //查询表最后一个id
    task_sess := task_mgo.GetMgoConn()
    defer task_mgo.DestoryMongoConn(task_sess)
    q:=map[string]interface{}{
        "isused":true,
    }
    between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
    it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
    for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
        lteid = util.ObjToString(tmp["gtid"])
        log.Println("查询的最后一个任务Id:",lteid)
        break
    }

    log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
    time.Sleep(5 * time.Minute)

    sess := mgo.GetMgoConn()//连接器
    defer mgo.DestoryMongoConn(sess)
    //开始判重
    q = map[string]interface{}{
        "_id": map[string]interface{}{
            "$gt": StringTOBsonId(gtid),
            "$lte": StringTOBsonId(lteid),
        },
    }
    log.Println("历史判重查询条件:",q,"时间:", between_time)
    it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
    num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
    updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
    pendAllArr:=[][]map[string]interface{}{}//待处理数组
    dayArr := []map[string]interface{}{}
    for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
        if num%10000 == 0 {
            log.Println("正序遍历:", num)
        }
        source := util.ObjToMap(tmp["jsondata"])
        if util.IntAll((*source)["sourcewebsite"]) == 1 {
            outnum++
            updatelock.Lock()
            updateExtract = append(updateExtract, []map[string]interface{}{
                map[string]interface{}{
                    "_id": tmp["_id"],
                },
                map[string]interface{}{
                    "$set": map[string]interface{}{
                        "repeat": 1,
                        "dataging": 0,
                        "repeat_reason": "sourcewebsite为1 重复",
                    },
                },
            })
            if len(updateExtract) >= 200 {
                log.Println("sourcewebsite,批量更新")
                mgo.UpSertBulk(extract, updateExtract...)
                updateExtract = [][]map[string]interface{}{}
            }

            updatelock.Unlock()


            tmp = make(map[string]interface{})
            continue
        }

        //取-符合-发布时间X年内的数据
        updatelock.Lock()
        if util.IntAll(tmp["dataging"]) == 1 {
            pubtime := util.Int64All(tmp["publishtime"])
            if pubtime > 0 && pubtime >= between_time {
                oknum++
                if deterTime==0 {
                    log.Println("找到第一条符合条件的数据")
                    deterTime = util.Int64All(tmp["publishtime"])
                    dayArr = append(dayArr,tmp)
                }else {
                    if pubtime-deterTime >timingSpanDay*86400 {
                        //新数组重新构建,当前组数据加到全部组数据
                        pendAllArr = append(pendAllArr,dayArr)
                        dayArr = []map[string]interface{}{}
                        deterTime = util.Int64All(tmp["publishtime"])
                        dayArr = append(dayArr,tmp)
                    }else {
                        dayArr = append(dayArr,tmp)
                    }
                }
            }else {
                outnum++
                //不在两年内的也清标记
                updateExtract = append(updateExtract, []map[string]interface{}{
                    map[string]interface{}{
                        "_id": tmp["_id"],
                    },
                    map[string]interface{}{
                        "$set": map[string]interface{}{
                            "dataging": 0,
                        },
                    },
                })
                if len(updateExtract) >= 200 {
                    log.Println("不在周期内符合dataging==1,批量更新")
                    mgo.UpSertBulk(extract, updateExtract...)
                    updateExtract = [][]map[string]interface{}{}
                }

            }
        }

        updatelock.Unlock()

        tmp = make(map[string]interface{})
    }


    //批量更新标记
    updatelock.Lock()

    if len(updateExtract) > 0 {
        log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
        mgo.UpSertBulk(extract, updateExtract...)
        updateExtract = [][]map[string]interface{}{}
    }

    updatelock.Unlock()


    if len(dayArr)>0 {
        pendAllArr = append(pendAllArr,dayArr)
        dayArr = []map[string]interface{}{}
    }

    log.Println("查询数量:",num,"符合条件:",oknum)

    if len(pendAllArr) <= 0 {
        log.Println("没找到dataging==1的数据")
    }

    //测试分组数量是否正确
    testNum:=0
    for k,v:=range pendAllArr {
        log.Println("第",k,"组--","数量:",len(v))
        testNum = testNum+len(v)
    }
    log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)

    n, repeateN := 0, 0
    log.Println("线程数:",threadNum)
    pool := make(chan bool, threadNum)
    wg := &sync.WaitGroup{}
    for k,v:=range pendAllArr { //每组结束更新一波数据
        pool <- true
        wg.Add(1)
        go func(k int, v []map[string]interface{}) {
            defer func() {
                <-pool
                wg.Done()
            }()
            //每组临时数组 -  互不干扰
            groupUpdateExtract := [][]map[string]interface{}{}
            //
            groupOtherExtract := [][]map[string]interface{}{}

            //构建当前组的数据池
            log.Println("构建第", k, "组---(数据池)")
            //当前组的第一个发布时间
            first_pt := util.Int64All(v[len(v)-1]["publishtime"])
            curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
            log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
            n = n + len(v)
            log.Println("统计目前总数量:", n, "重复数量:", repeateN)
            for _, tmp := range v {
                info := NewInfo(tmp)
                b, source, reason := curTM.check(info)
                if b { //有重复,生成更新语句,更新抽取和更新招标
                    repeateN++
                    //重复数据打标签
                    repeat_ids:=source.repeat_ids
                    repeat_ids =  append(repeat_ids,info.id)
                    source.repeat_ids = repeat_ids
                    //替换数据池-更新
                    DM.replacePoolData(source)
                    updatelock.Lock()


                    //更新数据源-   14 或者 15
                    //判断是否在当前段落
                    if judgeIsCurIds(gtid,lteid,source.id) {
                        groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
                            map[string]interface{}{
                                "_id": StringTOBsonId(source.id),
                            },
                            map[string]interface{}{
                                "$set": map[string]interface{}{
                                    "repeat_ids": repeat_ids,
                                },
                            },
                        })
                    }else {
                        groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
                            map[string]interface{}{
                                "_id": StringTOBsonId(source.id),
                            },
                            map[string]interface{}{
                                "$set": map[string]interface{}{
                                    "repeat_ids": repeat_ids,
                                },
                            },
                        })
                    }
                    groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                        map[string]interface{}{
                            "_id": tmp["_id"],
                        },
                        map[string]interface{}{
                            "$set": map[string]interface{}{
                                "repeat":        1,
                                "repeat_reason": reason,
                                "repeat_id":     source.id,
                                "dataging":      0,
                            },
                        },
                    })

                    if len(groupUpdateExtract) >= 500 {
                        mgo.UpSertBulk(extract, groupUpdateExtract...)
                        groupUpdateExtract = [][]map[string]interface{}{}
                    }

                    if len(groupOtherExtract) >= 500 {
                        mgo.UpSertBulk(extract_back, groupOtherExtract...)
                        groupOtherExtract = [][]map[string]interface{}{}
                    }

                    updatelock.Unlock()


                } else {
                    updatelock.Lock()

                    groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                        map[string]interface{}{
                            "_id": tmp["_id"],
                        },
                        map[string]interface{}{
                            "$set": map[string]interface{}{
                                "dataging": 0, //符合条件的都为dataging==0
                            },
                        },
                    })

                    if len(groupUpdateExtract) >= 500 {
                        mgo.UpSertBulk(extract, groupUpdateExtract...)
                        groupUpdateExtract = [][]map[string]interface{}{}
                    }
                    updatelock.Unlock()
                }
            }
            //每组数据结束-更新数据
            updatelock.Lock()
            if len(groupUpdateExtract) > 0 {
                mgo.UpSertBulk(extract, groupUpdateExtract...)
            }

            if len(groupOtherExtract) > 0 {
                mgo.UpSertBulk(extract_back, groupOtherExtract...)
            }
            updatelock.Unlock()

        }(k, v)

    }

    wg.Wait()


    //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
    if n >= repeateN && gtid!=lteid{
        for _, to := range nextNode {
            next_sid := util.BsonIdToSId(gtid)
            next_eid := util.BsonIdToSId(lteid)
            key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
            by, _ := json.Marshal(map[string]interface{}{
                "gtid":  next_sid,
                "lteid": next_eid,
                "stype": util.ObjToString(to["stype"]),
                "key":   key,
            })
            addr := &net.UDPAddr{
                IP:   net.ParseIP(to["addr"].(string)),
                Port: util.IntAll(to["port"]),
            }
            node := &udpNode{by, addr, time.Now().Unix(), 0}
            udptaskmap.Store(key, node)
            udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
        }
    }

    end:=time.Now().Unix()

    log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
    log.Println(gtid,lteid)
    if end-start<60*5 {
        log.Println("睡眠.............")
        time.Sleep(5 * time.Minute)
    }
    log.Println("继续下一段的历史判重")
}

}func historyTaskDay() {

defer util.Catch()

for {
    start:=time.Now().Unix()

    if gtid=="" {
        log.Println("请传gtid,否则无法运行")
        os.Exit(0)
        return
    }
    if lteid!="" {
        //先进行数据迁移
        log.Println("开启一次迁移任务",gtid,lteid)
        moveHistoryData(gtid,lteid)
        gtid = lteid //替换数据
    }

    //查询表最后一个id
    task_sess := task_mgo.GetMgoConn()
    defer task_mgo.DestoryMongoConn(task_sess)
    q:=map[string]interface{}{
        "isused":true,
    }
    between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
    it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
    for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
        lteid = util.ObjToString(tmp["gtid"])
        log.Println("查询的最后一个任务Id:",lteid)
        break
    }

    log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
    time.Sleep(5 * time.Minute)

    sess := mgo.GetMgoConn()//连接器
    defer mgo.DestoryMongoConn(sess)
    //开始判重
    q = map[string]interface{}{
        "_id": map[string]interface{}{
            "$gt": StringTOBsonId(gtid),
            "$lte": StringTOBsonId(lteid),
        },
    }
    log.Println("历史判重查询条件:",q,"时间:", between_time)
    it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
    num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
    updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
    pendAllArr:=[][]map[string]interface{}{}//待处理数组
    dayArr := []map[string]interface{}{}
    for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
        if num%10000 == 0 {
            log.Println("正序遍历:", num)
        }
        source := util.ObjToMap(tmp["jsondata"])
        if util.IntAll((*source)["sourcewebsite"]) == 1 {
            outnum++
            updatelock.Lock()
            updateExtract = append(updateExtract, []map[string]interface{}{
                map[string]interface{}{
                    "_id": tmp["_id"],
                },
                map[string]interface{}{
                    "$set": map[string]interface{}{
                        "repeat": 1,
                        "dataging": 0,
                        "repeat_reason": "sourcewebsite为1 重复",
                    },
                },
            })
            if len(updateExtract) >= 200 {
                log.Println("sourcewebsite,批量更新")
                mgo.UpSertBulk(extract, updateExtract...)
                updateExtract = [][]map[string]interface{}{}
            }

            updatelock.Unlock()


            tmp = make(map[string]interface{})
            continue
        }

        //取-符合-发布时间X年内的数据
        updatelock.Lock()
        if util.IntAll(tmp["dataging"]) == 1 {
            pubtime := util.Int64All(tmp["publishtime"])
            if pubtime > 0 && pubtime >= between_time {
                oknum++
                if deterTime==0 {
                    log.Println("找到第一条符合条件的数据")
                    deterTime = util.Int64All(tmp["publishtime"])
                    dayArr = append(dayArr,tmp)
                }else {
                    if pubtime-deterTime >timingSpanDay*86400 {
                        //新数组重新构建,当前组数据加到全部组数据
                        pendAllArr = append(pendAllArr,dayArr)
                        dayArr = []map[string]interface{}{}
                        deterTime = util.Int64All(tmp["publishtime"])
                        dayArr = append(dayArr,tmp)
                    }else {
                        dayArr = append(dayArr,tmp)
                    }
                }
            }else {
                outnum++
                //不在两年内的也清标记
                updateExtract = append(updateExtract, []map[string]interface{}{
                    map[string]interface{}{
                        "_id": tmp["_id"],
                    },
                    map[string]interface{}{
                        "$set": map[string]interface{}{
                            "dataging": 0,
                        },
                    },
                })
                if len(updateExtract) >= 200 {
                    log.Println("不在周期内符合dataging==1,批量更新")
                    mgo.UpSertBulk(extract, updateExtract...)
                    updateExtract = [][]map[string]interface{}{}
                }

            }
        }

        updatelock.Unlock()

        tmp = make(map[string]interface{})
    }


    //批量更新标记
    updatelock.Lock()

    if len(updateExtract) > 0 {
        log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
        mgo.UpSertBulk(extract, updateExtract...)
        updateExtract = [][]map[string]interface{}{}
    }

    updatelock.Unlock()


    if len(dayArr)>0 {
        pendAllArr = append(pendAllArr,dayArr)
        dayArr = []map[string]interface{}{}
    }

    log.Println("查询数量:",num,"符合条件:",oknum)

    if len(pendAllArr) <= 0 {
        log.Println("没找到dataging==1的数据")
    }

    //测试分组数量是否正确
    testNum:=0
    for k,v:=range pendAllArr {
        log.Println("第",k,"组--","数量:",len(v))
        testNum = testNum+len(v)
    }
    log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)

    n, repeateN := 0, 0
    log.Println("线程数:",threadNum)
    pool := make(chan bool, threadNum)
    wg := &sync.WaitGroup{}
    for k,v:=range pendAllArr { //每组结束更新一波数据
        pool <- true
        wg.Add(1)
        go func(k int, v []map[string]interface{}) {
            defer func() {
                <-pool
                wg.Done()
            }()
            //每组临时数组 -  互不干扰
            groupUpdateExtract := [][]map[string]interface{}{}
            //
            groupOtherExtract := [][]map[string]interface{}{}

            //构建当前组的数据池
            log.Println("构建第", k, "组---(数据池)")
            //当前组的第一个发布时间
            first_pt := util.Int64All(v[len(v)-1]["publishtime"])
            curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
            log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
            n = n + len(v)
            log.Println("统计目前总数量:", n, "重复数量:", repeateN)
            for _, tmp := range v {
                info := NewInfo(tmp)
                b, source, reason := curTM.check(info)
                if b { //有重复,生成更新语句,更新抽取和更新招标
                    repeateN++
                    //重复数据打标签
                    repeat_ids:=source.repeat_ids
                    repeat_ids =  append(repeat_ids,info.id)
                    source.repeat_ids = repeat_ids
                    //替换数据池-更新
                    DM.replacePoolData(source)
                    updatelock.Lock()


                    //更新数据源-   14 或者 15
                    //判断是否在当前段落
                    if judgeIsCurIds(gtid,lteid,source.id) {
                        groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
                            map[string]interface{}{
                                "_id": StringTOBsonId(source.id),
                            },
                            map[string]interface{}{
                                "$set": map[string]interface{}{
                                    "repeat_ids": repeat_ids,
                                },
                            },
                        })
                    }else {
                        groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
                            map[string]interface{}{
                                "_id": StringTOBsonId(source.id),
                            },
                            map[string]interface{}{
                                "$set": map[string]interface{}{
                                    "repeat_ids": repeat_ids,
                                },
                            },
                        })
                    }
                    groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                        map[string]interface{}{
                            "_id": tmp["_id"],
                        },
                        map[string]interface{}{
                            "$set": map[string]interface{}{
                                "repeat":        1,
                                "repeat_reason": reason,
                                "repeat_id":     source.id,
                                "dataging":      0,
                            },
                        },
                    })

                    if len(groupUpdateExtract) >= 500 {
                        mgo.UpSertBulk(extract, groupUpdateExtract...)
                        groupUpdateExtract = [][]map[string]interface{}{}
                    }

                    if len(groupOtherExtract) >= 500 {
                        mgo.UpSertBulk(extract_back, groupOtherExtract...)
                        groupOtherExtract = [][]map[string]interface{}{}
                    }

                    updatelock.Unlock()


                } else {
                    updatelock.Lock()

                    groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
                        map[string]interface{}{
                            "_id": tmp["_id"],
                        },
                        map[string]interface{}{
                            "$set": map[string]interface{}{
                                "dataging": 0, //符合条件的都为dataging==0
                            },
                        },
                    })

                    if len(groupUpdateExtract) >= 500 {
                        mgo.UpSertBulk(extract, groupUpdateExtract...)
                        groupUpdateExtract = [][]map[string]interface{}{}
                    }
                    updatelock.Unlock()
                }
            }
            //每组数据结束-更新数据
            updatelock.Lock()
            if len(groupUpdateExtract) > 0 {
                mgo.UpSertBulk(extract, groupUpdateExtract...)
            }

            if len(groupOtherExtract) > 0 {
                mgo.UpSertBulk(extract_back, groupOtherExtract...)
            }
            updatelock.Unlock()

        }(k, v)

    }

    wg.Wait()


    //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
    if n >= repeateN && gtid!=lteid{
        for _, to := range nextNode {
            next_sid := util.BsonIdToSId(gtid)
            next_eid := util.BsonIdToSId(lteid)
            key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
            by, _ := json.Marshal(map[string]interface{}{
                "gtid":  next_sid,
                "lteid": next_eid,
                "stype": util.ObjToString(to["stype"]),
                "key":   key,
            })
            addr := &net.UDPAddr{
                IP:   net.ParseIP(to["addr"].(string)),
                Port: util.IntAll(to["port"]),
            }
            node := &udpNode{by, addr, time.Now().Unix(), 0}
            udptaskmap.Store(key, node)
            udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
        }
    }

    end:=time.Now().Unix()

    log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
    log.Println(gtid,lteid)
    if end-start<60*5 {
        log.Println("睡眠.............")
        time.Sleep(5 * time.Minute)
    }
    log.Println("继续下一段的历史判重")
}

}