Selaa lähdekoodia

添加半小时内 数据监控

wcc 2 vuotta sitten
vanhempi
commit
8fd026fe53
3 muutettua tiedostoa jossa 58 lisäystä ja 23 poistoa
  1. 4 0
      src/main.go
  2. 48 7
      src/udptask/udptask.go
  3. 6 16
      src/udptask/udptaskmap.go

+ 4 - 0
src/main.go

@@ -5,6 +5,7 @@ import (
 	"log"
 	"net/http"
 	"task"
+	"time"
 	"udptask"
 	"web"
 
@@ -14,6 +15,7 @@ import (
 var port string
 
 func main() {
+	udptask.LastNodeResponse = time.Now().Unix()
 	flag.StringVar(&port, "p", "8003", "端口") //87测试分类用8004;8005projectinfo定时任务
 	flag.Parse()
 	logger.SetRollingDaily("./logs", "updatetime.log") //日志记录
@@ -21,6 +23,8 @@ func main() {
 	go Server()
 	log.Println("start web at ", port)
 	udptask.InitUdp()
+	//UDP 监听程序
+	go udptask.LastUdpJob()
 	//go task.RunTask() //定时任务o_projectinfo分类
 	go udptask.RunningHangyeClass() //根据内存中的行业分类udps执行分类任务
 	b := make(chan bool)

+ 48 - 7
src/udptask/udptask.go

@@ -3,10 +3,13 @@ package udptask
 import (
 	"encoding/json"
 	"fmt"
+	"io/ioutil"
 	"log"
 	mu "mfw/util"
 	"net"
+	"net/http"
 	qutil "qfw/util"
+	"sync"
 	u "util"
 
 	//	"sync"
@@ -15,6 +18,10 @@ import (
 	. "tools"
 )
 
+var responselock sync.Mutex
+
+var LastNodeResponse int64
+
 func InitUdp() {
 	go func() {
 		updport, _ := Config["udpport"].(string)
@@ -65,9 +72,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				if key == "" {
 					key = "udpok"
 				}
-				Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+				go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 				//行业分类开始,更新bidding_processing_ids表dataprocess=5
-				if stype == "hangye" { //行业分类控制udp传输的id段顺序
+				if stype == "hangye" {
+					LastNodeResponse = time.Now().Unix()
 					task.HangyeUdps <- mapInfo
 					gtid := mapInfo["gtid"].(string)
 					lteid := mapInfo["lteid"].(string)
@@ -82,6 +90,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						},
 					}
 					MgoClass.Update("bidding_processing_ids", query, set, false, false)
+				} else if stype == "monitor" { //程序监听类型
+					fmt.Println("stype :monitor")
 				} else if stype != "" {
 					go UdpTask(stype, mapInfo) //执行分类
 				} else {
@@ -90,11 +100,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 		}
 	case mu.OP_NOOP: //下个节点回应
-		ok := string(data)
-		if ok != "" {
-			log.Println("ok:", ok)
-			udptaskmap.Delete(ok)
-		}
+		udptaskmap.Delete(string(data))
+		log.Println("下节点回应:", string(data))
 	}
 }
 
@@ -260,3 +267,37 @@ func ExtractByUdp(ra *net.UDPAddr, instanceId ...string) {
 		log.Println("分类完成")
 	}
 }
+
+//LastUdpJob 处理UDP 没有接受数据
+func LastUdpJob() {
+	for {
+		responselock.Lock()
+		if time.Now().Unix()-LastNodeResponse >= 1800 {
+			LastNodeResponse = time.Now().Unix() //重置时间
+			sendErrMailApi("分类异常", fmt.Sprintf("半小时左右~无新段落数据进入 分类流程...相关人员检查..."))
+		}
+
+		responselock.Unlock()
+		time.Sleep(300 * time.Second)
+	}
+}
+
+//sendErrMailApi 发送邮件
+func sendErrMailApi(title, body string) {
+	jkmail, _ := Config["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+
+	log.Println(tomail, api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	} else {
+		log.Println("邮件发送失败:", err)
+	}
+}

+ 6 - 16
src/udptask/udptaskmap.go

@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"io/ioutil"
 	"log"
-	mu "mfw/util"
 	"net/http"
 	"sync"
 	"time"
@@ -28,22 +27,13 @@ func checkMapJob() {
 			now := time.Now().Unix()
 			node, _ := v.(*UdpNode)
 			if now-node.Timestamp > 120 {
-				node.Retry++
-				if node.Retry > 5 {
-					log.Println("udp重试失败", k)
-					udptaskmap.Delete(k)
-					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "classification-send-fail", k.(string)))
-					if err == nil {
-						defer res.Body.Close()
-						read, err := ioutil.ReadAll(res.Body)
-						log.Println("邮件发发送:", string(read), err)
-					}
-				} else {
-					log.Println("udp重发", k)
-					Udpclient.WriteUdp(node.Data, mu.OP_TYPE_DATA, node.Addr)
+				udptaskmap.Delete(k)
+				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "分类-下节点-未响应,请相关人员检查", k.(string)))
+				if err == nil {
+					defer res.Body.Close()
+					read, err := ioutil.ReadAll(res.Body)
+					log.Println("邮件发发送:", string(read), err)
 				}
-			} else if now-node.Timestamp > 10 {
-				log.Println("udp任务超时中..", k)
 			}
 			return true
 		})