zhengkun hace 1 año
padre
commit
da3f78d0fb
Se han modificado 7 ficheros con 164 adiciones y 129 borrados
  1. 13 20
      src/config.json
  2. 1 2
      src/dataMethod.go
  3. 87 72
      src/historyRepeat.go
  4. 2 6
      src/increaseRepeat.go
  5. 3 3
      src/initData.go
  6. 34 19
      src/main.go
  7. 24 7
      src/udptaskmap.go

+ 13 - 20
src/config.json

@@ -2,27 +2,27 @@
     "udpport": ":1785",
     "dupdays": 7,
     "mongodb": {
-        "addr": "192.168.3.206:27002",
-        "db": "qfw_data",
-        "username": "root",
-        "password": "root",
-        "extract": "result_20230510",
-        "extract_back": "result_20230510",
+        "addr": "127.0.0.1:27017",
+        "db": "zhengkun",
+        "username": "",
+        "password": "",
+        "extract": "repeat_test",
+        "extract_back": "repeat_test",
         "extract_log": "result_replace_log",
         "pool": 5
     },
     "task_mongodb": {
-        "task_addr": "192.168.3.206:27002",
-        "task_db": "qfw_data",
-        "username": "root",
-        "password": "root",
+        "task_addr": "127.0.0.1:27017",
+        "task_db": "zhengkun",
+        "username": "",
+        "password": "",
         "task_coll": "bidding_processing_ids",
         "task_bidding": "bidding",
         "task_pool": 5
     },
     "spider_mongodb": {
-        "spider_addr": "192.168.3.166:27082",
-        "spider_db": "zk_extract_service",
+        "spider_addr": "127.0.0.1:27017",
+        "spider_db": "zhengkun",
         "username": "",
         "password": "",
         "spider_coll": "site",
@@ -32,14 +32,7 @@
         "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
         "api": "http://172.17.145.179:19281/_send/_mail"
     },
-    "nextNode": [
-        {
-            "addr": "127.0.0.1",
-            "port": 1783,
-            "stype": "bidding",
-            "memo": "创建招标数据索引new"
-        }
-    ],
+    "nextNode": [],
     "jyfb_data": [
         "a_jyxxfbpt_gg"
     ],

+ 1 - 2
src/dataMethod.go

@@ -403,8 +403,7 @@ func IsJpHref(href string) bool {
 
 //验证竞品是否重复
 func confirmJingPinIsRepeatData(v *Info, info *Info) bool {
-
-	//标题验证~是否有关联~是否需要清洗数据
+	//标题验证~是否有关联~是否需要清洗数据-长度需要考虑
 	if v.c_title != "" && info.c_title != "" { //标题相似判断
 		if !(strings.Contains(v.c_title, info.c_title) || strings.Contains(info.c_title, v.c_title)) {
 			if !jingPinElementSame(v, info) {

+ 87 - 72
src/historyRepeat.go

@@ -49,21 +49,21 @@ func historyRepeat() {
 			}
 		}
 		if !isRepeatStatus {
-			log.Println("查询不到有标记的lteid数据")
-			log.Println("睡眠2分钟 gtid:", gtid, "lteid:", lteid)
-			time.Sleep(2 * time.Minute)
+			log.Println("查询不到有标记的lteid数据......睡眠......")
+			time.Sleep(30 * time.Second)
 			continue
 		}
 
-		log.Println("查询完毕-找到有标记的lteid-先睡眠2分钟", gtid, lteid)
+		log.Println("查询找到有标记的lteid......睡眠......", gtid, lteid)
 		if isUpdateSite {
 			initSite()
 		}
-		time.Sleep(2 * time.Minute)
+		time.Sleep(30 * time.Second)
 
 		sess := data_mgo.GetMgoConn() //连接器
 		defer data_mgo.DestoryMongoConn(sess)
 		between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
+
 		//开始判重
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
@@ -163,75 +163,90 @@ func historyRepeat() {
 					b, source, reason := curTM.check(info)
 					if b { //有重复,更新
 						repeateN++
-						if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
-							datalock.Lock()
-							temp_source_id := source.id
-							temp_info_id := info.id
-							temp_source := info
-							temp_source.id = temp_source_id
-							curTM.replacePoolData(temp_source)
-							//替换抽取表数据
-							is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
-							is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
-
-							if is_log && is_bid {
-								data_mgo.Save(extract_log, map[string]interface{}{
-									"_id":        StringTOBsonId(temp_info_id),
-									"replace_id": temp_source_id,
-									"is_history": 1,
-								})
-								ext_s_data["repeat"] = 0
-								ext_s_data["dataging"] = 0
-								ext_i_data["repeat"] = 1
-								ext_i_data["repeat_id"] = temp_source_id
-								ext_i_data["repeat_reason"] = reason
-								ext_i_data["dataging"] = 0
-								ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
-								if is_exists {
-									data_mgo.DeleteById(extract, temp_source_id)
-									data_mgo.Save(extract, ext_s_data)
-								} else {
-									data_mgo.DeleteById(extract_back, temp_source_id)
-									data_mgo.Save(extract_back, ext_s_data)
-									is_del := data_mgo.DeleteById(extract, temp_source_id)
-									if is_del > 0 {
-										data_mgo.Save(extract, ext_s_data)
-									}
-								}
-								data_mgo.DeleteById(extract, temp_info_id)
-								data_mgo.Save(extract, ext_i_data)
-
-								task_mgo.DeleteById(task_bidding, temp_source_id)
-								task_mgo.Save(task_bidding, bid_s_data)
-								task_mgo.DeleteById(task_bidding, temp_info_id)
-								task_mgo.Save(task_bidding, bid_i_data)
-
-								//通道填充数据
-								msg := "id=" + temp_source_id
-								_ = nspdata_1.Publish(msg)
-								_ = nspdata_2.Publish(msg)
-
-							} else {
-								log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
-							}
-
-							datalock.Unlock()
-						} else {
-							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
-								map[string]interface{}{
-									"_id": tmp["_id"],
-								},
-								map[string]interface{}{
-									"$set": map[string]interface{}{
-										"repeat":             1,
-										"repeat_reason":      reason,
-										"repeat_id":          source.id,
-										"dataging":           0,
-										"history_updatetime": util.Int64All(time.Now().Unix()),
-									},
+						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":             1,
+									"repeat_reason":      reason,
+									"repeat_id":          source.id,
+									"dataging":           0,
+									"history_updatetime": util.Int64All(time.Now().Unix()),
 								},
-							}
+							},
 						}
+						//关闭数据替换功能
+						//if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
+						//	datalock.Lock()
+						//	temp_source_id := source.id
+						//	temp_info_id := info.id
+						//	temp_source := info
+						//	temp_source.id = temp_source_id
+						//	curTM.replacePoolData(temp_source)
+						//	//替换抽取表数据
+						//	is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
+						//	is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
+						//
+						//	if is_log && is_bid {
+						//		data_mgo.Save(extract_log, map[string]interface{}{
+						//			"_id":        StringTOBsonId(temp_info_id),
+						//			"replace_id": temp_source_id,
+						//			"is_history": 1,
+						//		})
+						//		ext_s_data["repeat"] = 0
+						//		ext_s_data["dataging"] = 0
+						//		ext_i_data["repeat"] = 1
+						//		ext_i_data["repeat_id"] = temp_source_id
+						//		ext_i_data["repeat_reason"] = reason
+						//		ext_i_data["dataging"] = 0
+						//		ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
+						//		if is_exists {
+						//			data_mgo.DeleteById(extract, temp_source_id)
+						//			data_mgo.Save(extract, ext_s_data)
+						//		} else {
+						//			data_mgo.DeleteById(extract_back, temp_source_id)
+						//			data_mgo.Save(extract_back, ext_s_data)
+						//			is_del := data_mgo.DeleteById(extract, temp_source_id)
+						//			if is_del > 0 {
+						//				data_mgo.Save(extract, ext_s_data)
+						//			}
+						//		}
+						//		data_mgo.DeleteById(extract, temp_info_id)
+						//		data_mgo.Save(extract, ext_i_data)
+						//
+						//		task_mgo.DeleteById(task_bidding, temp_source_id)
+						//		task_mgo.Save(task_bidding, bid_s_data)
+						//		task_mgo.DeleteById(task_bidding, temp_info_id)
+						//		task_mgo.Save(task_bidding, bid_i_data)
+						//
+						//		//通道填充数据
+						//		msg := "id=" + temp_source_id
+						//		_ = nspdata_1.Publish(msg)
+						//		_ = nspdata_2.Publish(msg)
+						//
+						//	} else {
+						//		log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
+						//	}
+						//
+						//	datalock.Unlock()
+						//} else {
+						//	Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+						//		map[string]interface{}{
+						//			"_id": tmp["_id"],
+						//		},
+						//		map[string]interface{}{
+						//			"$set": map[string]interface{}{
+						//				"repeat":             1,
+						//				"repeat_reason":      reason,
+						//				"repeat_id":          source.id,
+						//				"dataging":           0,
+						//				"history_updatetime": util.Int64All(time.Now().Unix()),
+						//			},
+						//		},
+						//	}
+						//}
 					} else {
 						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 							map[string]interface{}{

+ 2 - 6
src/increaseRepeat.go

@@ -166,15 +166,11 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 		}(dataArr)
 	}
 	wg.Wait()
-	fmt.Println("")
 	log.Println("当前~判重~结束~", total, "重复~", repeatN)
 	//更新流程记录表
-	//updateOcrFileData(mapInfo["lteid"].(string))
 	updateProcessUdpIdsInfo(qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]))
-
-	time.Sleep(15 * time.Second)
-	//任务完成,开始发送广播通知下面节点
-	log.Println("判重任务完成发送udp")
+	time.Sleep(10 * time.Second)
+	log.Println("判重任务完成...发送下节点udp...")
 	for _, to := range nextNode {
 		sid, _ := mapInfo["gtid"].(string)
 		eid, _ := mapInfo["lteid"].(string)

+ 3 - 3
src/initData.go

@@ -68,16 +68,16 @@ func initOther() {
 	DM = NewDatamap(dupdays, lastid)
 	Update = newUpdatePool()
 	go Update.updateData()
-
+	nsqAddr := "172.17.162.36:4150"
 	if !IsFull {
 		var err error
-		nspdata_1, err = nsqdata.NewProducer("172.17.145.179:4150", "bidding_id", true)
+		nspdata_1, err = nsqdata.NewProducer(nsqAddr, "bidding_id", true)
 		if err != nil {
 			log.Fatal("通道配置异常~", err)
 		} else {
 			log.Println("通道配置正常")
 		}
-		nspdata_2, err = nsqdata.NewProducer("172.17.145.179:4150", "project_id", true)
+		nspdata_2, err = nsqdata.NewProducer(nsqAddr, "project_id", true)
 		if err != nil {
 			log.Fatal("通道配置异常~", err)
 		} else {

+ 34 - 19
src/main.go

@@ -7,6 +7,7 @@ package main
 import (
 	"encoding/json"
 	"flag"
+	"fmt"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -42,9 +43,11 @@ var (
 	jyfb_data                                  map[string]string
 	taskList                                   []map[string]interface{}
 	nspdata_1, nspdata_2                       *nsqdata.Producer
+	responselock                               sync.Mutex
+	lastNodeResponse                           int64
 )
 
-//初始化加载
+// 初始化加载
 func init() {
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")   //历史
@@ -58,40 +61,47 @@ func init() {
 }
 
 func mainT() {
-	IsFull = true
-	//AddGroupPool = newAddGroupPool()
-	//go AddGroupPool.addGroupData()
-	//fullDataRepeat() //全量判重
 	increaseRepeat(map[string]interface{}{
 		"gtid":  "12ec61170ae152a3c2310f02",
 		"lteid": "92ec61170ae152a3c2310f02",
 	})
-	//gtid = "62ec2dd00ae152a3c230c1a1"
-	//lteid = "62ec2dd00ae152a3c230c1e1"
-	//historyRepeat()
 	time.Sleep(99999 * time.Hour)
 }
 
-//主函数
+func lastUdpJob() {
+	for {
+		responselock.Lock()
+		if time.Now().Unix()-lastNodeResponse >= 1800 {
+			lastNodeResponse = time.Now().Unix() //重置时间
+			sendErrMailApi("判重增量~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入判重增量流程...相关人员检查..."))
+		}
+		responselock.Unlock()
+		time.Sleep(300 * time.Second)
+	}
+}
+
+// 主函数
 func main() {
-	go checkMapJob()
+	go checkMailJob()
+	lastNodeResponse = time.Now().Unix()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	if TimingTask {
-		log.Println("正常历史部署")
+		log.Println("正常历史部署...")
 		go historyRepeat()
 	} else {
 		if !IsFull {
-			log.Println("正常增量部署,监听任务")
+			log.Println("正常增量部署与监控机制...")
+			go lastUdpJob()
 			go getRepeatTask()
 		}
 	}
 	time.Sleep(99999 * time.Hour)
 }
 
-//udp接收
+// udp接收
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
 	case mu.OP_TYPE_DATA:
@@ -101,9 +111,17 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 		} else if mapInfo != nil {
 			sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
+			stype := qu.ObjToString(mapInfo["stype"])
+			if stype == "monitor" {
+				log.Println("收到监测......")
+				key := qu.ObjToString(mapInfo["key"])
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+				return
+			}
 			if sid == "" || eid == "" {
 				log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
 			} else {
+				lastNodeResponse = time.Now().Unix()
 				key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
 				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 				//计算是否需要加载站点~每天加载一次
@@ -118,15 +136,12 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 		}
 	case mu.OP_NOOP: //下个节点回应
-		ok := string(data)
-		if ok != "" {
-			log.Println("ok:", ok)
-			udptaskmap.Delete(ok)
-		}
+		log.Println("下节点回应:", string(data))
+		udptaskmap.Delete(string(data))
 	}
 }
 
-//监听-获取-分发判重任务
+// 监听-获取-分发判重任务
 func getRepeatTask() {
 	for {
 		if len(taskList) > 0 {

+ 24 - 7
src/udptaskmap.go

@@ -22,14 +22,14 @@ type udpNode struct {
 	retry     int
 }
 
-func checkMapJob() {
+func checkMailJob() {
 	//阿里云内网无法发送邮件
 	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
 	if jkmail != nil {
 		tomail, _ = jkmail["to"].(string)
 		api, _ = jkmail["api"].(string)
 	}
-	log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+	log.Println("start checkMailJob", tomail, Sysconfig["jkmail"])
 	for {
 		udptaskmap.Range(func(k, v interface{}) bool {
 			now := time.Now().Unix()
@@ -37,12 +37,12 @@ func checkMapJob() {
 			if now-node.timestamp > 120 {
 				udptaskmap.Delete(k)
 				info_str := ""
-				if strings.Contains(k.(string),"project") {
-					info_str = fmt.Sprintf("下节点~项目合并~未响应~相关人员检查~%s",k.(string))
-				}else {
-					info_str = fmt.Sprintf("下节点~索引~未响应~相关人员检查~%s",k.(string))
+				if strings.Contains(k.(string), "project") {
+					info_str = fmt.Sprintf("下节点~项目合并~未响应~相关人员检查~%s", k.(string))
+				} else {
+					info_str = fmt.Sprintf("下节点~同步数据~未响应~相关人员检查~%s", k.(string))
 				}
-				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "增量判重程序~严重警告",info_str))
+				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "增量判重程序~严重警告", info_str))
 				if err == nil {
 					defer res.Body.Close()
 					read, err := ioutil.ReadAll(res.Body)
@@ -54,3 +54,20 @@ func checkMapJob() {
 		time.Sleep(60 * time.Second)
 	}
 }
+
+func sendErrMailApi(title, body string) {
+	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail, api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	} else {
+		log.Println("邮件发送失败:", err)
+	}
+}