package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "github.com/cron" "gopkg.in/mgo.v2/bson" "log" mu "mfw/util" "net" "os" "qfw/util" "regexp" "strconv" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 mgo *MongodbSim //mongodb操作对象 task_mgo *MongodbSim //mongodb操作对象 task_collName string extract string extract_back string udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 dupdays = 7 //初始化判重范围 DM *datamap // Update *updateInfo //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_0 = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") isMerger bool //是否合并 threadNum int //线程数量 SiteMap map[string]map[string]interface{} //站点map LowHeavy bool //低质量数据判重 TimingTask bool //是否定时任务 timingSpanDay int64 //时间跨度 timingPubScope int64 //发布时间周期 gtid,lastid,gtept,ltept string //命令输入 lteid string //历史增量属性 IsFull bool //是否全量 updatelock sync.Mutex //锁4 userName,passWord string //mongo -用户密码 ) var udptask chan struct{} = make(chan struct{}, 1) func init() { flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量 flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史 flag.StringVar(>ept, "gtept", "", "全量gte发布时间")//全量区间pt flag.StringVar(<ept, "ltept", "", "全量lte发布时间") //全量区间pt flag.Parse() util.ReadConfig(&Sysconfig) userName = util.ObjToString(Sysconfig["userName"]) passWord = util.ObjToString(Sysconfig["passWord"]) log.Println("集群用户密码:",userName,passWord) task_mconf := Sysconfig["task_mongodb"].(map[string]interface{}) task_mgo = &MongodbSim{ MongodbAddr: task_mconf["task_addrName"].(string), DbName: task_mconf["task_dbName"].(string), Size: util.IntAllDef(task_mconf["task_pool"], 10), UserName: userName, Password: passWord, } task_mgo.InitPool() task_collName = task_mconf["task_collName"].(string) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf = Sysconfig["mongodb"].(map[string]interface{}) mgo = &MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 10), } mgo.InitPool() extract = mconf["extract"].(string) extract_back = mconf["extract_back"].(string) dupdays = util.IntAllDef(Sysconfig["dupdays"], 3) //加载数据 DM = NewDatamap(dupdays, lastid) //更新池 Update = newUpdatePool() go Update.updateData() FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"])) FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"])) FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"])) FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"])) isMerger = Sysconfig["isMerger"].(bool) threadNum = util.IntAllDef(Sysconfig["threads"], 1) LowHeavy = Sysconfig["lowHeavy"].(bool) TimingTask = Sysconfig["timingTask"].(bool) timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"]) timingPubScope = util.Int64All(Sysconfig["timingPubScope"]) //站点配置 site := mconf["site"].(map[string]interface{}) SiteMap = make(map[string]map[string]interface{}, 0) start := int(time.Now().Unix()) sess_site := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess_site) res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter() for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); { data_map := map[string]interface{}{ "area": util.ObjToString(site_dict["area"]), "city": util.ObjToString(site_dict["city"]), "district": util.ObjToString(site_dict["district"]), "sitetype": util.ObjToString(site_dict["sitetype"]), "level": util.ObjToString(site_dict["level"]), "weight": util.ObjToString(site_dict["weight"]), } SiteMap[util.ObjToString(site_dict["site"])] = data_map } log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap)) } func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) if TimingTask { log.Println("正常历史部署") go historyTaskDay() }else { if gtept!=""&<ept!="" { log.Println("全量判重-准备开始") IsFull = true //全量判重 sid := "1fffffffffffffffffffffff" eid := "9fffffffffffffffffffffff" mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stop"] = "true" task([]byte{}, mapinfo) time.Sleep(99999 * time.Hour) }else { //正常增量 log.Println("正常增量部署") } } time.Sleep(99999 * time.Hour) } //测试组人员使用 func mainT() { if TimingTask { go historyTaskDay() time.Sleep(99999 * time.Hour) } else { IsFull = true //全量判重 sid := "1fffffffffffffffffffffff" eid := "9fffffffffffffffffffffff" mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stop"] = "true" log.Println("测试:全量判重-准备开始") task([]byte{}, mapinfo) time.Sleep(99999 * time.Hour) } } //upd接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { select { case udptask <- struct{}{}: log.Println("...接收段落,通道正常...") switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Println("error data:", err) udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } log.Println("当前段落,需要判重...",mapInfo) udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) task(data, mapInfo) } log.Println("此段任务结束...",err,mapInfo) <-udptask case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("下节点回应-ok:", ok) udptaskmap.Delete(ok) } <-udptask } case <-time.After(2 * time.Second): switch act { case mu.OP_TYPE_DATA: //上个节点的数据 log.Println("通道堵塞中...上节点") udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra) case mu.OP_NOOP: //下个节点回应 log.Println("通道堵塞中...下节点") ok := string(data) if ok != "" { log.Println("下节点回应-ok:", ok) udptaskmap.Delete(ok) } } } //udptask <- struct{}{} //defer func() { // <-udptask //}() } //开始判重程序 func task(data []byte, mapInfo map[string]interface{}) { log.Println("开始数据判重") defer util.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } //全量 if IsFull && gtept!="" && ltept!=""{ log.Println("执行全量分段模式") log.Println(gtept,"---",ltept) q = map[string]interface{}{ "publishtime": map[string]interface{}{ "$gte": util.Int64All(gtept), "$lte": util.Int64All(ltept), }, } } log.Println("查询条件:",mgo.DbName, extract, q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if n%1000 == 0 { log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN) } if util.IntAll(tmp["repeat"]) == 1 { repeateN++ tmp = make(map[string]interface{}) continue } if util.IntAll(tmp["dataging"]) == 1 && !IsFull{ tmp = make(map[string]interface{}) continue } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) //正常判重 b, source, reason := DM.check(info) if b { repeateN++ var updateID = map[string]interface{}{} //记录更新判重的 updateID["_id"] = StringTOBsonId(info.id) repeat_ids:=source.repeat_ids repeat_ids = append(repeat_ids,info.id) source.repeat_ids = repeat_ids //替换数据池-更新 DM.replacePoolData(source) Update.updatePool <- []map[string]interface{}{//原始数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, } Update.updatePool <- []map[string]interface{}{//重复数据打标签 updateID, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, "updatetime_repeat" :util.Int64All(time.Now().Unix()), }, }, } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //log.Println("当前数据池的数量:",DM.currentTotalCount()) //睡眠时间30s 目的是让数据池更新所有数据... time.Sleep(15 * time.Second) //更新Ocr的标记 updateOcrFileData(mapInfo["lteid"].(string)) //任务完成,开始发送广播通知下面节点 if n >= repeateN && mapInfo["stop"] == nil { log.Println("判重任务完成发送udp") for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } func updateOcrFileData(cur_lteid string) { //更新ocr 分类表-判重的状态 log.Println("开始更新Ocr表-标记",cur_lteid) task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q_task:=map[string]interface{}{} it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter() isUpdateOcr:=false updateOcrFile:=[][]map[string]interface{}{} for tmp := make(map[string]interface{}); it_last.Next(&tmp); { cur_id := BsonTOStringId(tmp["_id"]) lteid:=util.ObjToString(tmp["lteid"]) if (lteid==cur_lteid) { //需要更新 log.Println("找到该lteid数据",cur_lteid,cur_id) isUpdateOcr = true updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "is_repeat_status": 1, "is_repeat_time" : util.Int64All(time.Now().Unix()), }, }, }) tmp = make(map[string]interface{}) break }else { tmp = make(map[string]interface{}) } } if !isUpdateOcr { log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid) }else { if len(updateOcrFile) > 0 { task_mgo.UpSertBulk(task_collName, updateOcrFile...) } } } //历史判重 func historyTaskDay() { defer util.Catch() for { start:=time.Now().Unix() if gtid=="" { log.Println("请传gtid,否则无法运行") os.Exit(0) return } if lteid!="" { //先进行数据迁移 log.Println("开启一次迁移任务",gtid,lteid) moveHistoryData(gtid,lteid) gtid = lteid //替换数据 } //查询表最后一个id task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q:=map[string]interface{}{} between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期 it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter() isRepeatStatus:=false for tmp := make(map[string]interface{}); it_last.Next(&tmp); { is_repeat_status:=util.IntAll(tmp["is_repeat_status"]) if is_repeat_status == 1 { lteid = util.ObjToString(tmp["lteid"]) log.Println("查询的最后一个已标记的任务lteid:",lteid) isRepeatStatus = true tmp = make(map[string]interface{}) break }else { tmp = make(map[string]interface{}) } } if !isRepeatStatus { log.Println("查询不到有标记的lteid数据") log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid) time.Sleep(5 * time.Minute) continue } log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid) time.Sleep(5 * time.Minute) sess := mgo.GetMgoConn()//连接器 defer mgo.DestoryMongoConn(sess) //开始判重 q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(gtid), "$lte": StringTOBsonId(lteid), }, } log.Println("历史判重查询条件:",q,"时间:", between_time) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数 pendAllArr:=[][]map[string]interface{}{}//待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } //取-符合-发布时间X年内的数据 if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time { oknum++ if deterTime==0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { if pubtime-deterTime >timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { dayArr = append(dayArr,tmp) } } }else { outnum++ //不在两年内的也清标记 Update.updatePool <- []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, "history_updatetime":util.Int64All(time.Now().Unix()), }, }, } } } tmp = make(map[string]interface{}) } if len(dayArr)>0 { pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum:=0 for k,v:=range pendAllArr { log.Println("第",k,"组--","数量:",len(v)) testNum = testNum+len(v) } log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum) n, repeateN := 0, 0 log.Println("线程数:",threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for k,v:=range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() //相关ids 跨表 groupOtherExtract := [][]map[string]interface{}{} //构建当前组的数据池 log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := util.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) b, source, reason := curTM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ //重复数据打标签 repeat_ids:=source.repeat_ids repeat_ids = append(repeat_ids,info.id) source.repeat_ids = repeat_ids updatelock.Lock() //替换数据池-更新 DM.replacePoolData(source) //更新数据源 //判断是否在当前段落 if judgeIsCurIds(gtid,lteid,source.id) { Update.updatePool <- []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, } }else { groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, }) } Update.updatePool <- []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, "history_updatetime":util.Int64All(time.Now().Unix()), }, }, } if len(groupOtherExtract) >= 500 { mgo.UpSertBulk(extract_back, groupOtherExtract...) groupOtherExtract = [][]map[string]interface{}{} } updatelock.Unlock() } else { Update.updatePool <- []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 "history_updatetime":util.Int64All(time.Now().Unix()), }, }, } } } //每组数据结束-更新数据 updatelock.Lock() if len(groupOtherExtract) > 0 { mgo.UpSertBulk(extract_back, groupOtherExtract...) } updatelock.Unlock() }(k, v) } wg.Wait() log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid) time.Sleep(30 * time.Second) //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并 if n >= repeateN && gtid!=lteid{ for _, to := range nextNode { next_sid := util.BsonIdToSId(gtid) next_eid := util.BsonIdToSId(lteid) key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": next_sid, "lteid": next_eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } end:=time.Now().Unix() log.Println(gtid,lteid) if end-start<60*5 { log.Println("睡眠.............") time.Sleep(5 * time.Minute) } log.Println("继续下一段的历史判重") } } //判断是否在当前id段落 func judgeIsCurIds (gtid string,lteid string,curid string) bool { gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64) lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64) cur_time, _ := strconv.ParseInt(curid[:8], 16, 64) if cur_time>=gt_time&&cur_time<=lte_time { return true } return false } //迁移上一段数据 func moveHistoryData(startid string,endid string) { sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) year, month, day := time.Now().Date() q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(startid), "$lte": StringTOBsonId(endid), }, } log.Println(q) it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { mgo.Save(extract_back, tmp) tmp = map[string]interface{}{} if index%1000 == 0 { log.Println("index", index) } } log.Println("save to", extract_back, " ok index", index) qv := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(), }, } delnum := mgo.Delete(extract, qv) log.Println("remove from ", extract, delnum) } func moveTimeoutData() { log.Println("部署迁移定时任务") c := cron.New() c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() }) c.Start() } func moveOnceTimeOut() { log.Println("执行一次迁移超时数据") sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) now:=time.Now() move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local) task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time)) q := map[string]interface{}{ "_id": map[string]interface{}{ "$lt": StringTOBsonId(task_id), }, } it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000 == 0 { log.Println("index", index) } del_id:=BsonTOStringId(tmp["_id"]) mgo.Save("result_20200713", tmp) mgo.DeleteById("result_20200714",del_id) tmp = map[string]interface{}{} } log.Println("save and delete", " ok index", index) }