浏览代码

备份~替换数据~通道填充

zhengkun 3 年之前
父节点
当前提交
fd2dc418b1
共有 8 个文件被更改,包括 350 次插入221 次删除
  1. 1 1
      src/config.json
  2. 23 0
      src/dataMethod.go
  3. 24 38
      src/datamap.go
  4. 116 127
      src/historyRepeat.go
  5. 28 49
      src/increaseRepeat.go
  6. 17 6
      src/main.go
  7. 91 0
      src/nsqdata/consumer.go
  8. 50 0
      src/nsqdata/producer.go

+ 1 - 1
src/config.json

@@ -6,7 +6,7 @@
         "pool": 10,
         "db": "zhengkun",
         "extract": "repeat_test",
-        "extract_back": "repeat_test",
+        "extract_back": "repeat_test_back",
         "extract_log": "result_replace_log",
         "site": {
             "dbname": "zhengkun",

+ 23 - 0
src/dataMethod.go

@@ -382,6 +382,29 @@ func confrimExtractData(source_id string, info_id string) (bool, map[string]inte
 	return isvalid, info_data, source_data
 }
 
+//查询历史抽取表数据
+func confrimHistoryExtractData(source_id string, info_id string) (bool, bool, map[string]interface{}, map[string]interface{}) {
+	source_data := map[string]interface{}{}
+	info_data := map[string]interface{}{}
+	isvalid := false
+	isexists := false
+	if judgeIsCurIds(gtid, lteid, source_id) {
+		isexists = true
+		source_data = data_mgo.FindById(extract, source_id)
+	} else {
+		source_data = data_mgo.FindById(extract_back, source_id)
+	}
+	info_data = data_mgo.FindById(extract, info_id)
+	if len(source_data) > 2 && len(info_data) > 2 {
+		isvalid = true
+		ts_id := source_data["_id"]
+		ti_id := info_data["_id"]
+		source_data["_id"] = ti_id
+		info_data["_id"] = ts_id
+	}
+	return isvalid, isexists, info_data, source_data
+}
+
 //查询bidding表数据
 func confrimBiddingData(source_id string, info_id string) (bool, map[string]interface{}, map[string]interface{}) {
 	source_data := map[string]interface{}{}

+ 24 - 38
src/datamap.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"fmt"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	"log"
 	qutil "qfw/util"
 	"reflect"
@@ -13,32 +12,30 @@ import (
 )
 
 type Info struct {
-	id               string   //id
-	title            string   //标题
-	spidercode       string   //爬虫代码
-	area             string   //省份
-	city             string   //城市
-	subtype          string   //信息类型
-	buyer            string   //采购单位
-	agency           string   //代理机构
-	winner           string   //中标单位
-	budget           float64  //预算金额
-	bidamount        float64  //中标金额
-	projectname      string   //项目名称
-	projectcode      string   //项目编号
-	contractnumber   string   //合同编号
-	publishtime      int64    //发布时间
-	comeintime       int64    //入库时间
-	bidopentime      int64    //开标时间
-	bidopenaddress   string   //开标地点
-	site             string   //站点
-	href             string   //正文的url
-	repeatid         string   //重复id
-	titleSpecialWord bool     //标题特殊词
-	specialWord      bool     //再次判断的特殊词
-	is_site          bool     //是否站点城市
-	repeat_ids       []string //记录所有重复id
-
+	id               string  //id
+	title            string  //标题
+	spidercode       string  //爬虫代码
+	area             string  //省份
+	city             string  //城市
+	subtype          string  //信息类型
+	buyer            string  //采购单位
+	agency           string  //代理机构
+	winner           string  //中标单位
+	budget           float64 //预算金额
+	bidamount        float64 //中标金额
+	projectname      string  //项目名称
+	projectcode      string  //项目编号
+	contractnumber   string  //合同编号
+	publishtime      int64   //发布时间
+	comeintime       int64   //入库时间
+	bidopentime      int64   //开标时间
+	bidopenaddress   string  //开标地点
+	site             string  //站点
+	href             string  //正文的url
+	repeatid         string  //重复id
+	titleSpecialWord bool    //标题特殊词
+	specialWord      bool    //再次判断的特殊词
+	is_site          bool    //是否站点城市
 }
 
 var datelimit = float64(432000) //五天
@@ -233,17 +230,6 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	info.repeatid = qutil.ObjToString(tmp["repeatid"])
 	info.specialWord = FilterRegTitle.MatchString(info.title)
 	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) || FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
-
-	//加载repeat_ids数据
-	repeat_ids := []string{}
-	if ids_1, ok := tmp["repeat_ids"].([]interface{}); ok {
-		repeat_ids = qutil.ObjArrToStringArr(ids_1)
-	}
-	if ids_2, ok := tmp["repeat_ids"].(primitive.A); ok {
-		repeat_ids = qutil.ObjArrToStringArr(ids_2)
-	}
-	info.repeat_ids = repeat_ids
-
 	info.is_site = false
 
 	return info

+ 116 - 127
src/historyRepeat.go

@@ -8,6 +8,7 @@ import (
 	mu "mfw/util"
 	"net"
 	"os"
+	qu "qfw/common/src/qfw/util"
 	"qfw/util"
 	"strconv"
 	"sync"
@@ -18,64 +19,61 @@ import (
 func historyRepeat() {
 	defer util.Catch()
 	for {
-		start:=time.Now().Unix()
-		if gtid=="" {
+		start := time.Now().Unix()
+		if gtid == "" {
 			log.Println("请传gtid,否则无法运行")
 			os.Exit(0)
 			return
 		}
-		if lteid!="" {
-			//先进行数据迁移
-			log.Println("开启一次迁移任务",gtid,lteid)
-			moveHistoryData(gtid,lteid)
+		if lteid != "" && !IsFull { //先进行数据迁移
+			log.Println("开启一次迁移任务", gtid, lteid)
+			moveHistoryData(gtid, lteid)
 			gtid = lteid //替换数据
 		}
 		//查询表最后一个id
 		task_sess := task_mgo.GetMgoConn()
 		defer task_mgo.DestoryMongoConn(task_sess)
-		q:=map[string]interface{}{}
-		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+		q := map[string]interface{}{}
 		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
-
-		isRepeatStatus:=false
+		isRepeatStatus := false
 		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
-			is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
+			is_repeat_status := util.IntAll(tmp["is_repeat_status"])
 			if is_repeat_status == 1 {
 				lteid = util.ObjToString(tmp["lteid"])
-				log.Println("查询的最后一个已标记的任务lteid:",lteid)
+				log.Println("查询的最后一个已标记的任务lteid:", lteid)
 				isRepeatStatus = true
 				tmp = make(map[string]interface{})
 				break
-			}else  {
+			} else {
 				tmp = make(map[string]interface{})
 			}
 		}
-
 		if !isRepeatStatus {
 			log.Println("查询不到有标记的lteid数据")
-			log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
+			log.Println("睡眠5分钟 gtid:", gtid, "lteid:", lteid)
 			time.Sleep(5 * time.Minute)
 			continue
 		}
-		log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
-		if isUpdateSite{
+		log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟", gtid, lteid)
+		if isUpdateSite {
 			initSite()
 		}
 		time.Sleep(5 * time.Minute)
 
-		sess := data_mgo.GetMgoConn()//连接器
+		sess := data_mgo.GetMgoConn() //连接器
 		defer data_mgo.DestoryMongoConn(sess)
+		between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
 		//开始判重
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
-				"$gt": StringTOBsonId(gtid),
+				"$gt":  StringTOBsonId(gtid),
 				"$lte": StringTOBsonId(lteid),
 			},
 		}
-		log.Println("历史判重查询条件:",q,"时间:", between_time)
+		log.Println("历史判重查询条件:", q, "时间:", between_time)
 		it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
-		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+		num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
+		pendAllArr := [][]map[string]interface{}{}                              //待处理数组
 		dayArr := []map[string]interface{}{}
 		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
 			if num%10000 == 0 {
@@ -86,32 +84,32 @@ func historyRepeat() {
 				pubtime := util.Int64All(tmp["publishtime"])
 				if pubtime > 0 && pubtime >= between_time {
 					oknum++
-					if deterTime==0 {
+					if deterTime == 0 {
 						log.Println("找到第一条符合条件的数据")
 						deterTime = util.Int64All(tmp["publishtime"])
-						dayArr = append(dayArr,tmp)
-					}else {
-						if pubtime-deterTime >timingSpanDay*86400 {
+						dayArr = append(dayArr, tmp)
+					} else {
+						if pubtime-deterTime > timingSpanDay*86400 {
 							//新数组重新构建,当前组数据加到全部组数据
-							pendAllArr = append(pendAllArr,dayArr)
+							pendAllArr = append(pendAllArr, dayArr)
 							dayArr = []map[string]interface{}{}
 							deterTime = util.Int64All(tmp["publishtime"])
-							dayArr = append(dayArr,tmp)
-						}else {
-							dayArr = append(dayArr,tmp)
+							dayArr = append(dayArr, tmp)
+						} else {
+							dayArr = append(dayArr, tmp)
 						}
 					}
-				}else {
+				} else {
 					outnum++
 					//不在两年内的也清标记
-					Update.updatePool <- []map[string]interface{}{//重复数据打标签
+					Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"dataging": 0,
-								"history_updatetime":util.Int64All(time.Now().Unix()),
+								"dataging":           0,
+								"history_updatetime": util.Int64All(time.Now().Unix()),
 							},
 						},
 					}
@@ -120,30 +118,30 @@ func historyRepeat() {
 			tmp = make(map[string]interface{})
 		}
 
-		if len(dayArr)>0 {
-			pendAllArr = append(pendAllArr,dayArr)
+		if len(dayArr) > 0 {
+			pendAllArr = append(pendAllArr, dayArr)
 			dayArr = []map[string]interface{}{}
 		}
 
-		log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
+		log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
 
 		if len(pendAllArr) <= 0 {
 			log.Println("没找到dataging==1的数据")
 		}
 
 		//测试分组数量是否正确
-		testNum:=0
-		for k,v:=range pendAllArr {
-			log.Println("第",k,"组--","数量:",len(v))
-			testNum = testNum+len(v)
+		testNum := 0
+		for k, v := range pendAllArr {
+			log.Println("第", k, "组--", "数量:", len(v))
+			testNum = testNum + len(v)
 		}
-		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+		log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
 
 		n, repeateN := 0, 0
-		log.Println("线程数:",threadNum)
+		log.Println("线程数:", threadNum)
 		pool := make(chan bool, threadNum)
 		wg := &sync.WaitGroup{}
-		for k,v:=range pendAllArr { //每组结束更新一波数据
+		for k, v := range pendAllArr { //每组结束更新一波数据
 			pool <- true
 			wg.Add(1)
 			go func(k int, v []map[string]interface{}) {
@@ -151,10 +149,6 @@ func historyRepeat() {
 					<-pool
 					wg.Done()
 				}()
-				//相关ids 跨表
-				groupOtherExtract := [][]map[string]interface{}{}
-
-				//构建当前组的数据池
 				log.Println("构建第", k, "组---(数据池)")
 				//当前组的第一个发布时间
 				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
@@ -165,94 +159,94 @@ func historyRepeat() {
 				for _, tmp := range v {
 					info := NewInfo(tmp)
 					b, source, reason := curTM.check(info)
-					if b { //有重复,生成更新语句,更新抽取和更新招标
+					if b { //有重复,更新
 						repeateN++
-						//重复数据打标签
-						repeat_ids:=source.repeat_ids
-						repeat_ids =  append(repeat_ids,info.id)
-						source.repeat_ids = repeat_ids
-
 						updatelock.Lock()
-						//替换数据池-更新
-						curTM.replacePoolData(source)
-						//更新数据源
-						//判断是否在当前段落
-						if judgeIsCurIds(gtid,lteid,source.id) {
-							Update.updatePool <- []map[string]interface{}{//重复数据打标签
-								map[string]interface{}{
-									"_id": StringTOBsonId(source.id),
-								},
-								map[string]interface{}{
-									"$set": map[string]interface{}{
-										"repeat_ids": repeat_ids,
-									},
-								},
+						if judgeIsReplaceInfo(source.href, info.href) {
+							temp_source_id := source.id
+							temp_info_id := info.id
+							temp_source := info
+							temp_source.id = temp_source_id
+							curTM.replacePoolData(temp_source)
+							//替换抽取表数据
+							is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
+							is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
+
+							if is_log && is_bid {
+								data_mgo.Save(extract_log, map[string]interface{}{
+									"_id":        StringTOBsonId(temp_info_id),
+									"replace_id": temp_source_id,
+									"is_history": 1,
+								})
+								ext_s_data["repeat"] = 0
+								ext_s_data["dataging"] = 0
+								ext_i_data["repeat"] = 1
+								ext_i_data["repeat_id"] = temp_source_id
+								ext_i_data["repeat_reason"] = reason
+								ext_i_data["dataging"] = 0
+								ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
+								if is_exists {
+									data_mgo.DeleteById(extract, temp_source_id)
+									data_mgo.Save(extract, ext_s_data)
+								} else {
+									data_mgo.DeleteById(extract_back, temp_source_id)
+									data_mgo.Save(extract_back, ext_s_data)
+								}
+								data_mgo.DeleteById(extract, temp_info_id)
+								data_mgo.Save(extract, ext_i_data)
+
+								task_mgo.DeleteById(task_bidding, temp_source_id)
+								task_mgo.Save(task_bidding, bid_s_data)
+								task_mgo.DeleteById(task_bidding, temp_info_id)
+								task_mgo.Save(task_bidding, bid_i_data)
+
+								//通道填充数据
+								msg := "id=" + temp_source_id
+								_ = MP.Publish(msg)
+
+							} else {
+								log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
 							}
-						}else {
-							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+						} else {
+							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 								map[string]interface{}{
-									"_id": StringTOBsonId(source.id),
+									"_id": tmp["_id"],
 								},
 								map[string]interface{}{
 									"$set": map[string]interface{}{
-										"repeat_ids": repeat_ids,
+										"repeat":             1,
+										"repeat_reason":      reason,
+										"repeat_id":          source.id,
+										"dataging":           0,
+										"history_updatetime": util.Int64All(time.Now().Unix()),
 									},
 								},
-							})
-						}
-						Update.updatePool <- []map[string]interface{}{//重复数据打标签
-							map[string]interface{}{
-								"_id": tmp["_id"],
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat":        1,
-									"repeat_reason": reason,
-									"repeat_id":     source.id,
-									"dataging":      0,
-									"history_updatetime":util.Int64All(time.Now().Unix()),
-								},
-							},
-						}
-						if len(groupOtherExtract) >= 500 {
-							data_mgo.UpSertBulk(extract_back, groupOtherExtract...)
-							groupOtherExtract = [][]map[string]interface{}{}
+							}
 						}
-
 						updatelock.Unlock()
-
 					} else {
-						Update.updatePool <- []map[string]interface{}{//重复数据打标签
+						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
 							map[string]interface{}{
 								"$set": map[string]interface{}{
-									"dataging": 0, //符合条件的都为dataging==0
-									"history_updatetime":util.Int64All(time.Now().Unix()),
+									"dataging":           0, //符合条件的都为dataging==0
+									"history_updatetime": util.Int64All(time.Now().Unix()),
 								},
 							},
 						}
 					}
 				}
-				//每组数据结束-更新数据
-				updatelock.Lock()
-				if len(groupOtherExtract) > 0 {
-					data_mgo.UpSertBulk(extract_back, groupOtherExtract...)
-				}
-				updatelock.Unlock()
-
 			}(k, v)
-
 		}
-
 		wg.Wait()
 
-		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+		log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
 
 		time.Sleep(30 * time.Second)
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
-		if gtid!=lteid{
+		if gtid != lteid {
 			for _, to := range nextNode {
 				next_sid := util.BsonIdToSId(gtid)
 				next_eid := util.BsonIdToSId(lteid)
@@ -273,36 +267,38 @@ func historyRepeat() {
 			}
 		}
 
-		end:=time.Now().Unix()
+		end := time.Now().Unix()
 
-		log.Println(gtid,lteid)
+		log.Println(gtid, lteid)
 
-		if end-start<60*5 {
+		if end-start < 60*5 {
 			log.Println("睡眠.............")
 			time.Sleep(5 * time.Minute)
 		}
 		log.Println("继续下一段的历史判重")
 	}
 }
+
 //判断是否在当前id段落
-func judgeIsCurIds (gtid string,lteid string,curid string) bool {
+func judgeIsCurIds(gtid string, lteid string, curid string) bool {
 
 	gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
 	lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
 	cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
-	if cur_time>=gt_time&&cur_time<=lte_time {
+	if cur_time >= gt_time && cur_time <= lte_time {
 		return true
 	}
 	return false
 }
+
 //迁移上一段数据
-func moveHistoryData(startid string,endid string) {
+func moveHistoryData(startid string, endid string) {
 	sess := data_mgo.GetMgoConn()
 	defer data_mgo.DestoryMongoConn(sess)
 	year, month, day := time.Now().Date()
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gt": StringTOBsonId(startid),
+			"$gt":  StringTOBsonId(startid),
 			"$lte": StringTOBsonId(endid),
 		},
 	}
@@ -320,7 +316,7 @@ func moveHistoryData(startid string,endid string) {
 
 	qv := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour * 2).Unix(),
 		},
 	}
 	delnum := data_mgo.Delete(extract, qv)
@@ -328,25 +324,18 @@ func moveHistoryData(startid string,endid string) {
 
 }
 
-
-
-
-
-
-
-
 //暂时弃用
-func moveTimeoutData()  {
+func moveTimeoutData() {
 	log.Println("部署迁移定时任务")
 	c := cron.New()
 	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
 	c.Start()
 }
-func moveOnceTimeOut()  {
+func moveOnceTimeOut() {
 	log.Println("执行一次迁移超时数据")
 	sess := data_mgo.GetMgoConn()
 	defer data_mgo.DestoryMongoConn(sess)
-	now:=time.Now()
+	now := time.Now()
 
 	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
 	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
@@ -362,9 +351,9 @@ func moveOnceTimeOut()  {
 		if index%10000 == 0 {
 			log.Println("index", index)
 		}
-		del_id:=BsonTOStringId(tmp["_id"])
+		del_id := BsonTOStringId(tmp["_id"])
 		data_mgo.Save("result_20200713", tmp)
-		data_mgo.DeleteById("result_20200714",del_id)
+		data_mgo.DeleteById("result_20200714", del_id)
 		tmp = map[string]interface{}{}
 	}
 	log.Println("save and delete", " ok index", index)

+ 28 - 49
src/increaseRepeat.go

@@ -37,7 +37,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 			tmp = make(map[string]interface{})
 			continue
 		}
-		if qu.IntAll(tmp["dataging"]) == 1 && !IsFull {
+		if qu.IntAll(tmp["dataging"]) == 1 {
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -77,7 +77,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 					if jyfb_data[info.spidercode] != "" { //伪判重标记
 						Update.updatePool <- []map[string]interface{}{ //原始数据打标签
 							map[string]interface{}{
-								"_id": StringTOBsonId(info.id),
+								"_id": tmp["_id"],
 							},
 							map[string]interface{}{
 								"$set": map[string]interface{}{
@@ -89,75 +89,54 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 						num++
 						//判断是否为~替换数据~模式
 						if judgeIsReplaceInfo(source.href, info.href) {
+							updatelock.Lock()
 							temp_source_id := source.id
 							temp_info_id := info.id
 							temp_source := info
 							temp_source.id = temp_source_id
-							repeat_ids := source.repeat_ids
-							repeat_ids = append(repeat_ids, temp_info_id)
-							temp_source.repeat_ids = repeat_ids
 							DM.replacePoolData(temp_source)
 							//替换抽取表数据
-							is_ext, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
-							if is_ext {
-								//标记加上~
+							is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
+							is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
+							if is_log && is_bid {
+								data_mgo.Save(extract_log, map[string]interface{}{
+									"_id":        tmp["_id"],
+									"replace_id": temp_source_id,
+									"is_history": 0,
+								})
 								ext_s_data["repeat"] = 0
-								ext_s_data["repeat_ids"] = repeat_ids
-
 								ext_i_data["repeat"] = 1
-								ext_i_data["repeat_id"] = source.id
-								ext_i_data["dataging"] = 0
+								ext_i_data["repeat_id"] = temp_source_id
 								ext_i_data["repeat_reason"] = reason
-								ext_i_data["updatetime_repeat"] = qu.Int64All(time.Now().Unix())
-
 								data_mgo.DeleteById(extract, temp_source_id)
 								data_mgo.Save(extract, ext_s_data)
 								data_mgo.DeleteById(extract, temp_info_id)
 								data_mgo.Save(extract, ext_i_data)
-							} else {
-								log.Println("抽取表~未查询到数据~", temp_source_id, "~", temp_info_id)
-							}
-							i_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
-							if i_bid {
+
 								task_mgo.DeleteById(task_bidding, temp_source_id)
 								task_mgo.Save(task_bidding, bid_s_data)
 								task_mgo.DeleteById(task_bidding, temp_info_id)
 								task_mgo.Save(task_bidding, bid_i_data)
+
+								//通道填充数据
+								msg := "id=" + temp_source_id
+								_ = MP.Publish(msg)
+
 							} else {
-								log.Println("原始表~未查询到数据~", temp_source_id, "~", temp_info_id)
+								log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
 							}
-							//日志记录
-							data_mgo.Save(extract_log, map[string]interface{}{
-								"_id":        StringTOBsonId(temp_info_id),
-								"replace_id": temp_source_id,
-							})
-
+							updatelock.Unlock()
 						} else {
-							var updateID = map[string]interface{}{} //记录更新判重的
-							updateID["_id"] = StringTOBsonId(info.id)
-							repeat_ids := source.repeat_ids
-							repeat_ids = append(repeat_ids, info.id)
-							source.repeat_ids = repeat_ids
-							DM.replacePoolData(source) //替换数据池-更新
-							Update.updatePool <- []map[string]interface{}{ //原始数据打标签
+							//更新池~更新
+							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 								map[string]interface{}{
-									"_id": StringTOBsonId(source.id),
+									"_id": tmp["_id"],
 								},
 								map[string]interface{}{
 									"$set": map[string]interface{}{
-										"repeat_ids": repeat_ids,
-									},
-								},
-							}
-							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
-								updateID,
-								map[string]interface{}{
-									"$set": map[string]interface{}{
-										"repeat":            1,
-										"repeat_reason":     reason,
-										"repeat_id":         source.id,
-										"dataging":          0,
-										"updatetime_repeat": qu.Int64All(time.Now().Unix()),
+										"repeat":        1,
+										"repeat_reason": reason,
+										"repeat_id":     source.id,
 									},
 								},
 							}
@@ -210,8 +189,8 @@ func updateOcrFileData(cur_lteid string) {
 	updateOcrFile := [][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
 		cur_id := BsonTOStringId(tmp["_id"])
-		lteid := qu.ObjToString(tmp["lteid"])
-		if lteid == cur_lteid { //需要更新
+		lte_id := qu.ObjToString(tmp["lteid"])
+		if lte_id == cur_lteid { //需要更新
 			log.Println("找到该lteid数据", cur_lteid, cur_id)
 			isUpdateOcr = true
 			updateOcrFile = append(updateOcrFile, []map[string]interface{}{ //重复数据打标签

+ 17 - 6
src/main.go

@@ -11,6 +11,7 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
+	"nsqdata"
 	qu "qfw/util"
 	"regexp"
 	"sync"
@@ -51,6 +52,7 @@ var (
 	jyfb_data                         map[string]string                 //任务池
 	taskList                          []map[string]interface{}          //任务池
 	isUpdateSite                      bool
+	MP                                *nsqdata.Producer
 )
 
 //初始化加载
@@ -114,6 +116,11 @@ func init() {
 	timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"])
 	timingPubScope = qu.Int64All(Sysconfig["timingPubScope"])
 
+	var err error
+	MP, err = nsqdata.NewProducer("192.168.3.166:4150", "testnsq", true)
+	if err != nil {
+		log.Fatal("通道配置异常~", err)
+	}
 	c := cron.New()
 	c.AddFunc("0 0 1 ? * WED", func() {
 		isUpdateSite = true
@@ -194,23 +201,27 @@ func getRepeatTask() {
 	}
 }
 
-func main() {
+func mainT() {
 	IsFull = true
 
 	//AddGroupPool = newAddGroupPool()
 	//go AddGroupPool.addGroupData()
 	//fullDataRepeat() //全量判重
 
-	increaseRepeat(map[string]interface{}{
-		"gtid":  "62ec61170ae152a3c2310f02",
-		"lteid": "62ec61170ae152a3c2310f02",
-	})
+	//increaseRepeat(map[string]interface{}{
+	//	"gtid":  "12ec61170ae152a3c2310f02",
+	//	"lteid": "92ec61170ae152a3c2310f02",
+	//})
+
+	//gtid = "62ec2dd00ae152a3c230c1a1"
+	//lteid = "62ec2dd00ae152a3c230c1e1"
+	//historyRepeat()
 
 	time.Sleep(99999 * time.Hour)
 }
 
 //主函数
-func mainT() {
+func main() {
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}

+ 91 - 0
src/nsqdata/consumer.go

@@ -0,0 +1,91 @@
+package nsqdata
+
+import (
+	"encoding/json"
+	"github.com/go-nsq"
+	"strings"
+	"time"
+)
+
+type Consumer struct {
+	Ch           chan interface{}
+	C            *nsq.Consumer
+	Topic        string
+	Channel      string
+	IsJsonEncode bool
+	Conf         *Cconfig
+}
+
+type Cconfig struct {
+	IsJsonEncode         bool   //是否进行json序列化,解码也进行序列化,默认不进行json序列化
+	ConnectType          int    //连接类型 0连nsqd 1连nsqlookup
+	Interval             int    //设置服务发现的轮询时间,例如新的nsq出现,默认10秒
+	Addr, Topic, Channel string //连接地址(支持逗号分割多个),主题,通道
+	Concurrent           int    //并发数,默认为1
+}
+
+//处理消息
+func (c *Consumer) HandleMessage(msg *nsq.Message) error {
+	if c.IsJsonEncode {
+		if len(msg.Body) > 1 {
+			var err error
+			switch msg.Body[0] {
+			case 0x00:
+				var obj interface{}
+				err = json.Unmarshal(msg.Body[1:], &obj)
+				if err == nil && obj != nil {
+					c.Ch <- obj
+				}
+			case 0x01: //[]byte数组
+				var obj []byte
+				err = json.Unmarshal(msg.Body[1:], &obj)
+				if err == nil && obj != nil {
+					c.Ch <- obj
+				}
+			default:
+				var obj interface{}
+				err = json.Unmarshal(msg.Body, &obj)
+				if err == nil && obj != nil {
+					c.Ch <- obj
+				}
+			}
+			return err
+		}
+	} else {
+		c.Ch <- msg.Body
+	}
+	return nil
+}
+
+func NewConsumer(cc *Cconfig) (*Consumer, error) {
+	cfg := nsq.NewConfig()
+	if cc.Interval == 0 {
+		cc.Interval = 10
+	}
+	cfg.LookupdPollInterval = time.Duration(cc.Interval) * time.Second //设置服务发现的轮询时间,例如新的nsq出现
+	c, err := nsq.NewConsumer(cc.Topic, cc.Channel, cfg)               // 新建一个消费者
+	if err != nil {
+		return nil, err
+	}
+	if cc.Concurrent == 0 {
+		cc.Concurrent = 1
+	}
+	consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode, cc}
+	c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口
+	addrs := strings.Split(cc.Addr, ",")
+	var err1 error
+	if cc.ConnectType == 0 {
+		err1 = c.ConnectToNSQDs(addrs)
+	} else if cc.ConnectType == 1 {
+		err1 = c.ConnectToNSQLookupds(addrs)
+	}
+	return consumer, err1
+}
+
+//处理消息
+func (c *Consumer) Close(msg *nsq.Message) error {
+	if c.Conf.ConnectType == 1 {
+		return c.C.DisconnectFromNSQLookupd(c.Conf.Addr)
+	}
+	return c.C.DisconnectFromNSQD(c.Conf.Addr)
+}

+ 50 - 0
src/nsqdata/producer.go

@@ -0,0 +1,50 @@
+package nsqdata
+
+import (
+	"encoding/json"
+	"errors"
+	"github.com/go-nsq"
+)
+
+type Producer struct {
+	//Ch    chan interface{}
+	P            *nsq.Producer
+	Topic        string
+	IsJsonEncode bool //是否进行json序列化,如果否则必须以[]byte传递,如果是则必须用对应的消费者对象[也设置了序列化]处理
+}
+
+func NewProducer(addr, toppic string, IsJsonEncode bool) (*Producer, error) {
+	config := nsq.NewConfig()
+	producer, err := nsq.NewProducer(addr, config)
+	if err != nil {
+		return nil, err
+	} else {
+		return &Producer{producer, toppic, IsJsonEncode}, nil
+	}
+}
+
+func (p *Producer) Publish(msg interface{}) error {
+	if p.IsJsonEncode {
+		//var infoType byte
+		//switch msg.(type) {
+		//case []byte: //原本就是byte数组
+		//	infoType = 0x01
+		//default:
+		//	infoType = 0x00
+		//}
+		data, err := json.Marshal(msg)
+		if err != nil {
+			return err
+		} else if len(data) > 0 { //头部插入类型,用于解码[]byte
+			//data = append([]byte{infoType}, data...)
+			return p.P.Publish(p.Topic, data)
+		} else {
+			return errors.New("producer msg err")
+		}
+	} else { //必须传入[]byte
+		if bs, ok := msg.([]byte); ok {
+			return p.P.Publish(p.Topic, bs)
+		}
+		return errors.New("producer msg err: no []byte")
+	}
+}