123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847 |
- package main
- /**
- 招标信息判重
- **/
- import (
- "encoding/json"
- "flag"
- "github.com/cron"
- "gopkg.in/mgo.v2/bson"
- "log"
- mu "mfw/util"
- "net"
- "os"
- "qfw/util"
- "regexp"
- "strconv"
- "sync"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- mgo *MongodbSim //mongodb操作对象
- task_mgo *MongodbSim //mongodb操作对象
- task_collName string
- extract string
- extract_back string
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- dupdays = 7 //初始化判重范围
- DM *datamap //
- Update *updateInfo
- //正则筛选相关
- FilterRegTitle = regexp.MustCompile("^_$")
- FilterRegTitle_0 = regexp.MustCompile("^_$")
- FilterRegTitle_1 = regexp.MustCompile("^_$")
- FilterRegTitle_2 = regexp.MustCompile("^_$")
- isMerger bool //是否合并
- threadNum int //线程数量
- SiteMap map[string]map[string]interface{} //站点map
- LowHeavy bool //低质量数据判重
- TimingTask bool //是否定时任务
- timingSpanDay int64 //时间跨度
- timingPubScope int64 //发布时间周期
- gtid,lastid,gtept,ltept string //命令输入
- lteid string //历史增量属性
- IsFull bool //是否全量
- updatelock sync.Mutex //锁4
- userName,passWord string //mongo -用户密码
- taskList []map[string]interface{} //任务池
- )
- //udp通道
- var udptask chan struct{} = make(chan struct{}, 1)
- func init() {
- flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
- flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史
- flag.StringVar(>ept, "gtept", "", "全量gte发布时间")//全量区间pt
- flag.StringVar(<ept, "ltept", "", "全量lte发布时间") //全量区间pt
- flag.Parse()
- util.ReadConfig(&Sysconfig)
- userName = util.ObjToString(Sysconfig["userName"])
- passWord = util.ObjToString(Sysconfig["passWord"])
- log.Println("集群用户密码:",userName,passWord)
- task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
- task_mgo = &MongodbSim{
- MongodbAddr: task_mconf["task_addrName"].(string),
- DbName: task_mconf["task_dbName"].(string),
- Size: util.IntAllDef(task_mconf["task_pool"], 10),
- UserName: userName,
- Password: passWord,
- }
- task_mgo.InitPool()
- task_collName = task_mconf["task_collName"].(string)
- nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- mconf = Sysconfig["mongodb"].(map[string]interface{})
- mgo = &MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- DbName: mconf["db"].(string),
- Size: util.IntAllDef(mconf["pool"], 10),
- }
- mgo.InitPool()
- extract = mconf["extract"].(string)
- extract_back = mconf["extract_back"].(string)
- dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
- //加载数据
- DM = NewDatamap(dupdays, lastid)
- //更新池
- Update = newUpdatePool()
- go Update.updateData()
- FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
- FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
- FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
- FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
- isMerger = Sysconfig["isMerger"].(bool)
- threadNum = util.IntAllDef(Sysconfig["threads"], 1)
- LowHeavy = Sysconfig["lowHeavy"].(bool)
- TimingTask = Sysconfig["timingTask"].(bool)
- timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
- timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
- //站点配置
- site := mconf["site"].(map[string]interface{})
- SiteMap = make(map[string]map[string]interface{}, 0)
- start := int(time.Now().Unix())
- sess_site := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess_site)
- res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
- for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
- data_map := map[string]interface{}{
- "area": util.ObjToString(site_dict["area"]),
- "city": util.ObjToString(site_dict["city"]),
- "district": util.ObjToString(site_dict["district"]),
- "sitetype": util.ObjToString(site_dict["sitetype"]),
- "level": util.ObjToString(site_dict["level"]),
- "weight": util.ObjToString(site_dict["weight"]),
- }
- SiteMap[util.ObjToString(site_dict["site"])] = data_map
- }
- log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
- }
- func mainT() {
- go checkMapJob()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- if TimingTask {
- log.Println("正常历史部署")
- go historyTaskDay()
- }else {
- if gtept!=""&<ept!="" {
- log.Println("全量判重-准备开始")
- IsFull = true //全量判重
- sid := "1fffffffffffffffffffffff"
- eid := "9fffffffffffffffffffffff"
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- os.Exit(0)
- }
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- mapinfo["stop"] = "true"
- taskRepeat(mapinfo)
- time.Sleep(99999 * time.Hour)
- }else {
- //正常增量
- log.Println("正常增量部署,监听任务")
- go getRepeatTask()
- }
- }
- time.Sleep(99999 * time.Hour)
- }
- //测试组人员使用
- func main() {
- if TimingTask {
- go historyTaskDay()
- time.Sleep(99999 * time.Hour)
- } else {
- IsFull = true //全量判重
- sid := "1fffffffffffffffffffffff"
- eid := "9fffffffffffffffffffffff"
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- os.Exit(0)
- }
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- mapinfo["stop"] = "true"
- log.Println("测试:全量判重-准备开始")
- taskRepeat(mapinfo)
-
- time.Sleep(99999 * time.Hour)
- }
- }
- //udp接收
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //插入任务-判断任务-是否存在
- updatelock.Lock()
- taskList = append(taskList,mapInfo)
- log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
- updatelock.Unlock()
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //upd接收
- //func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- // select {
- // case udptask <- struct{}{}:
- // log.Println("...接收段落,通道正常...")
- // switch act {
- // case mu.OP_TYPE_DATA: //上个节点的数据
- // var mapInfo map[string]interface{}
- // err := json.Unmarshal(data, &mapInfo)
- // if err != nil {
- // log.Println("error data:", err)
- // udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- // } else if mapInfo != nil {
- // key, _ := mapInfo["key"].(string)
- // if key == "" {
- // key = "udpok"
- // }
- // log.Println("当前段落,需要判重...",mapInfo)
- // udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- // task(data, mapInfo)
- // }
- // log.Println("此段任务结束...",err,mapInfo)
- // <-udptask
- // case mu.OP_NOOP: //下个节点回应
- // ok := string(data)
- // if ok != "" {
- // log.Println("下节点回应-ok:", ok)
- // udptaskmap.Delete(ok)
- // }
- // <-udptask
- // }
- // case <-time.After(2 * time.Second):
- // switch act {
- // case mu.OP_TYPE_DATA: //上个节点的数据
- // log.Println("通道堵塞中...上节点")
- // udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra)
- // case mu.OP_NOOP: //下个节点回应
- // log.Println("通道堵塞中...下节点")
- // ok := string(data)
- // if ok != "" {
- // log.Println("下节点回应-ok:", ok)
- // udptaskmap.Delete(ok)
- // }
- // }
- // }
- //
- // //udptask <- struct{}{}
- // //defer func() {
- // // <-udptask
- // //}()
- //}
- //监听-获取-分发判重任务
- func getRepeatTask() {
- for {
- if len(taskList)>0 {
- updatelock.Lock()
- //log.Println("准备执行判重任务...")
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskRepeat(mapInfo) //判重方法
- }
- taskList = taskList[1:]
- log.Println("此段落结束当前任务池...",len(taskList),taskList)
- updatelock.Unlock()
- }else {
- //log.Println("无任务...睡眠15s")
- time.Sleep(15 * time.Second)
- }
- }
- }
- //开始判重程序
- func taskRepeat(mapInfo map[string]interface{}) {
- defer util.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- //全量
- if IsFull && gtept!="" && ltept!=""{
- log.Println("执行全量分段模式")
- log.Println(gtept,"---",ltept)
- q = map[string]interface{}{
- "publishtime": map[string]interface{}{
- "$gte": util.Int64All(gtept),
- "$lte": util.Int64All(ltept),
- },
- }
- }
- //临时赋值
- log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- n, repeateN := 0, 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- if n%1000 == 0 {
- log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
- }
- if util.IntAll(tmp["repeat"]) == 1 {
- repeateN++
- tmp = make(map[string]interface{})
- continue
- }
- if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
- tmp = make(map[string]interface{})
- continue
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- info := NewInfo(tmp)
- //正常判重
- b, source, reason := DM.check(info)
- if b {
- repeateN++
- var updateID = map[string]interface{}{} //记录更新判重的
- updateID["_id"] = StringTOBsonId(info.id)
- repeat_ids:=source.repeat_ids
- repeat_ids = append(repeat_ids,info.id)
- source.repeat_ids = repeat_ids
- //替换数据池-更新
- DM.replacePoolData(source)
- //Update.updatePool <- []map[string]interface{}{//原始数据打标签
- // map[string]interface{}{
- // "_id": StringTOBsonId(source.id),
- // },
- // map[string]interface{}{
- // "$set": map[string]interface{}{
- // "repeat_ids": repeat_ids,
- // },
- // },
- //}
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
- updateID,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": source.id,
- "dataging": 0,
- "updatetime_repeat" :util.Int64All(time.Now().Unix()),
- },
- },
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
- //log.Println("当前数据池的数量:",DM.currentTotalCount())
- //睡眠时间30s 目的是让数据池更新所有数据...
- time.Sleep(15 * time.Second)
- //更新Ocr的标记
- if !IsFull {
- updateOcrFileData(mapInfo["lteid"].(string))
- //任务完成,开始发送广播通知下面节点
- if n >= repeateN && mapInfo["stop"] == nil {
- log.Println("判重任务完成发送udp")
- for _, to := range nextNode {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- }
- }
- func updateOcrFileData(cur_lteid string) {
- //更新ocr 分类表-判重的状态
- log.Println("开始更新Ocr表-标记",cur_lteid)
- task_sess := task_mgo.GetMgoConn()
- defer task_mgo.DestoryMongoConn(task_sess)
- q_task:=map[string]interface{}{}
- it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
- isUpdateOcr:=false
- updateOcrFile:=[][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
- cur_id := BsonTOStringId(tmp["_id"])
- lteid:=util.ObjToString(tmp["lteid"])
- if (lteid==cur_lteid) { //需要更新
- log.Println("找到该lteid数据",cur_lteid,cur_id)
- isUpdateOcr = true
- updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "is_repeat_status": 1,
- "is_repeat_time" : util.Int64All(time.Now().Unix()),
- },
- },
- })
- tmp = make(map[string]interface{})
- break
- }else {
- tmp = make(map[string]interface{})
- }
- }
- if !isUpdateOcr {
- log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
- }else {
- if len(updateOcrFile) > 0 {
- task_mgo.UpSertBulk(task_collName, updateOcrFile...)
- }
- }
- }
- //历史判重
- func historyTaskDay() {
- defer util.Catch()
- for {
- start:=time.Now().Unix()
- if gtid=="" {
- log.Println("请传gtid,否则无法运行")
- os.Exit(0)
- return
- }
- if lteid!="" {
- //先进行数据迁移
- log.Println("开启一次迁移任务",gtid,lteid)
- moveHistoryData(gtid,lteid)
- gtid = lteid //替换数据
- }
- //查询表最后一个id
- task_sess := task_mgo.GetMgoConn()
- defer task_mgo.DestoryMongoConn(task_sess)
- q:=map[string]interface{}{}
- between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
- it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
- isRepeatStatus:=false
- for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
- is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
- if is_repeat_status == 1 {
- lteid = util.ObjToString(tmp["lteid"])
- log.Println("查询的最后一个已标记的任务lteid:",lteid)
- isRepeatStatus = true
- tmp = make(map[string]interface{})
- break
- }else {
- tmp = make(map[string]interface{})
- }
- }
- if !isRepeatStatus {
- log.Println("查询不到有标记的lteid数据")
- log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
- time.Sleep(5 * time.Minute)
- continue
- }
- log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
- time.Sleep(5 * time.Minute)
- sess := mgo.GetMgoConn()//连接器
- defer mgo.DestoryMongoConn(sess)
- //开始判重
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(gtid),
- "$lte": StringTOBsonId(lteid),
- },
- }
- log.Println("历史判重查询条件:",q,"时间:", between_time)
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
- pendAllArr:=[][]map[string]interface{}{}//待处理数组
- dayArr := []map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
- if num%10000 == 0 {
- log.Println("正序遍历:", num)
- }
- //取-符合-发布时间X年内的数据
- if util.IntAll(tmp["dataging"]) == 1 {
- pubtime := util.Int64All(tmp["publishtime"])
- if pubtime > 0 && pubtime >= between_time {
- oknum++
- if deterTime==0 {
- log.Println("找到第一条符合条件的数据")
- deterTime = util.Int64All(tmp["publishtime"])
- dayArr = append(dayArr,tmp)
- }else {
- if pubtime-deterTime >timingSpanDay*86400 {
- //新数组重新构建,当前组数据加到全部组数据
- pendAllArr = append(pendAllArr,dayArr)
- dayArr = []map[string]interface{}{}
- deterTime = util.Int64All(tmp["publishtime"])
- dayArr = append(dayArr,tmp)
- }else {
- dayArr = append(dayArr,tmp)
- }
- }
- }else {
- outnum++
- //不在两年内的也清标记
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "dataging": 0,
- "history_updatetime":util.Int64All(time.Now().Unix()),
- },
- },
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- if len(dayArr)>0 {
- pendAllArr = append(pendAllArr,dayArr)
- dayArr = []map[string]interface{}{}
- }
- log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
- if len(pendAllArr) <= 0 {
- log.Println("没找到dataging==1的数据")
- }
- //测试分组数量是否正确
- testNum:=0
- for k,v:=range pendAllArr {
- log.Println("第",k,"组--","数量:",len(v))
- testNum = testNum+len(v)
- }
- log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
- n, repeateN := 0, 0
- log.Println("线程数:",threadNum)
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- for k,v:=range pendAllArr { //每组结束更新一波数据
- pool <- true
- wg.Add(1)
- go func(k int, v []map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //相关ids 跨表
- groupOtherExtract := [][]map[string]interface{}{}
- //构建当前组的数据池
- log.Println("构建第", k, "组---(数据池)")
- //当前组的第一个发布时间
- first_pt := util.Int64All(v[len(v)-1]["publishtime"])
- curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
- log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
- n = n + len(v)
- log.Println("统计目前总数量:", n, "重复数量:", repeateN)
- for _, tmp := range v {
- info := NewInfo(tmp)
- b, source, reason := curTM.check(info)
- if b { //有重复,生成更新语句,更新抽取和更新招标
- repeateN++
- //重复数据打标签
- repeat_ids:=source.repeat_ids
- repeat_ids = append(repeat_ids,info.id)
- source.repeat_ids = repeat_ids
- updatelock.Lock()
- //替换数据池-更新
- DM.replacePoolData(source)
- //更新数据源
- //判断是否在当前段落
- if judgeIsCurIds(gtid,lteid,source.id) {
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": StringTOBsonId(source.id),
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat_ids": repeat_ids,
- },
- },
- }
- }else {
- groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": StringTOBsonId(source.id),
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat_ids": repeat_ids,
- },
- },
- })
- }
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": source.id,
- "dataging": 0,
- "history_updatetime":util.Int64All(time.Now().Unix()),
- },
- },
- }
- if len(groupOtherExtract) >= 500 {
- mgo.UpSertBulk(extract_back, groupOtherExtract...)
- groupOtherExtract = [][]map[string]interface{}{}
- }
- updatelock.Unlock()
- } else {
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "dataging": 0, //符合条件的都为dataging==0
- "history_updatetime":util.Int64All(time.Now().Unix()),
- },
- },
- }
- }
- }
- //每组数据结束-更新数据
- updatelock.Lock()
- if len(groupOtherExtract) > 0 {
- mgo.UpSertBulk(extract_back, groupOtherExtract...)
- }
- updatelock.Unlock()
- }(k, v)
- }
- wg.Wait()
- log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
- time.Sleep(30 * time.Second)
- //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
- if n >= repeateN && gtid!=lteid{
- for _, to := range nextNode {
- next_sid := util.BsonIdToSId(gtid)
- next_eid := util.BsonIdToSId(lteid)
- key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": next_sid,
- "lteid": next_eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- end:=time.Now().Unix()
- log.Println(gtid,lteid)
- if end-start<60*5 {
- log.Println("睡眠.............")
- time.Sleep(5 * time.Minute)
- }
- log.Println("继续下一段的历史判重")
- }
- }
- //判断是否在当前id段落
- func judgeIsCurIds (gtid string,lteid string,curid string) bool {
- gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
- lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
- cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
- if cur_time>=gt_time&&cur_time<=lte_time {
- return true
- }
- return false
- }
- //迁移上一段数据
- func moveHistoryData(startid string,endid string) {
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- year, month, day := time.Now().Date()
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(startid),
- "$lte": StringTOBsonId(endid),
- },
- }
- log.Println(q)
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
- index := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- mgo.Save(extract_back, tmp)
- tmp = map[string]interface{}{}
- if index%1000 == 0 {
- log.Println("index", index)
- }
- }
- log.Println("save to", extract_back, " ok index", index)
- qv := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
- },
- }
- delnum := mgo.Delete(extract, qv)
- log.Println("remove from ", extract, delnum)
- }
- func moveTimeoutData() {
- log.Println("部署迁移定时任务")
- c := cron.New()
- c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
- c.Start()
- }
- func moveOnceTimeOut() {
- log.Println("执行一次迁移超时数据")
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- now:=time.Now()
- move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
- task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$lt": StringTOBsonId(task_id),
- },
- }
- it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
- index := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- if index%10000 == 0 {
- log.Println("index", index)
- }
- del_id:=BsonTOStringId(tmp["_id"])
- mgo.Save("result_20200713", tmp)
- mgo.DeleteById("result_20200714",del_id)
- tmp = map[string]interface{}{}
- }
- log.Println("save and delete", " ok index", index)
- }
|