Эх сурвалжийг харах

保存服务增加数据监控

renzheng 7 жил өмнө
parent
commit
fddd614211

+ 55 - 2
src/saveServer/main.go

@@ -14,6 +14,8 @@ import (
 	"saveServer/tempinfo"
 	"saveServer/tempinfo"
 	"saveServer/tools"
 	"saveServer/tools"
 	"strings"
 	"strings"
+	"sync"
+	"time"
 
 
 	"gopkg.in/mgo.v2/bson"
 	"gopkg.in/mgo.v2/bson"
 )
 )
@@ -109,11 +111,13 @@ func inits() {
 	}()
 	}()
 }
 }
 
 
+var IS = &InfoStatus{time.Now().Unix(), 0, map[int]int{}, &sync.Mutex{}}
+
 func main() {
 func main() {
 	inits()
 	inits()
+	go IS.Save()
 	go HttpServer(port)
 	go HttpServer(port)
 	runNew()
 	runNew()
-	//进入服务等待
 	w := make(chan bool, 1)
 	w := make(chan bool, 1)
 	<-w
 	<-w
 }
 }
@@ -155,7 +159,9 @@ func handler(p *util.Packet) {
 			if p.Event == EVENT_SAVETODB {
 			if p.Event == EVENT_SAVETODB {
 				b = SaveToDB(para)
 				b = SaveToDB(para)
 			} else if p.Event == EVENT_SAVETODB_BIDDING {
 			} else if p.Event == EVENT_SAVETODB_BIDDING {
-				b = SaveBidding((para[1].([]interface{}))[0].(map[string]interface{}))
+				res := 0
+				b, res = SaveBidding((para[1].([]interface{}))[0].(map[string]interface{}))
+				go IS.Add(res)
 			}
 			}
 		}
 		}
 		client.WriteObj(p.From, p.Msgid, util.EVENT_RECIVE_CALLBACK, util.SENDTO_TYPE_P2P, b)
 		client.WriteObj(p.From, p.Msgid, util.EVENT_RECIVE_CALLBACK, util.SENDTO_TYPE_P2P, b)
@@ -165,3 +171,50 @@ func handler(p *util.Packet) {
 		}
 		}
 	})
 	})
 }
 }
+
+type InfoStatus struct {
+	Starttime int64
+	Endtime   int64
+	Val       map[int]int
+	Lock      *sync.Mutex
+}
+
+func (is *InfoStatus) Add(v int) {
+	is.Lock.Lock()
+	is.Val[v]++
+	is.Lock.Unlock()
+}
+
+func (is *InfoStatus) Save() {
+	is.Lock.Lock()
+	is.Endtime = time.Now().Unix()
+	save := map[string]interface{}{}
+	all := 0
+	for i := 0; i < 6; i++ {
+		v := is.Val[i]
+		all += v
+		switch i {
+		case 1: //正常处理
+			save["normal"] = v
+		case 2: //少字段丢弃
+			save["missfield"] = v
+		case 3: //信息错误,标题为空
+			save["infoerr"] = v
+		case 4: //重复
+			save["repeat"] = v
+		case 5: //延时入库信息量
+			save["delay"] = v
+		}
+	}
+	if all > 0 {
+		save["receive"] = all
+		save["starttime"] = is.Starttime
+		save["endtime"] = is.Endtime
+		save["flag"] = "saveser"
+		is.Val = map[int]int{}
+		go mongodb.Save("datamonitor", save)
+	}
+	is.Starttime = is.Endtime
+	is.Lock.Unlock()
+	time.AfterFunc(15*time.Minute, is.Save)
+}

+ 8 - 1
src/saveServer/savedata.go

@@ -60,11 +60,12 @@ func TimerSave() {
 //实时处理招标信息
 //实时处理招标信息
 var LogLEVEL = false
 var LogLEVEL = false
 
 
-func SaveBidding(tmp map[string]interface{}) (b bool) {
+func SaveBidding(tmp map[string]interface{}) (b bool, res int) {
 	T := qutil.ObjToString(tmp["T"])
 	T := qutil.ObjToString(tmp["T"])
 	if T != SaveColl {
 	if T != SaveColl {
 		b = true
 		b = true
 		go log.Println("abandon:", T, tmp["spidercode"])
 		go log.Println("abandon:", T, tmp["spidercode"])
+		res = 2
 		return
 		return
 	}
 	}
 	if LogLEVEL {
 	if LogLEVEL {
@@ -92,6 +93,7 @@ func SaveBidding(tmp map[string]interface{}) (b bool) {
 		if infoformat == 1 {
 		if infoformat == 1 {
 			btemp, binsert := tempinfo.CheckTempInfo(tmp)
 			btemp, binsert := tempinfo.CheckTempInfo(tmp)
 			if btemp {
 			if btemp {
+				res = 5
 				b = binsert
 				b = binsert
 				return
 				return
 			}
 			}
@@ -105,15 +107,20 @@ func SaveBidding(tmp map[string]interface{}) (b bool) {
 		tmp["s_sha"] = uuid
 		tmp["s_sha"] = uuid
 		lock.Lock()
 		lock.Lock()
 		if !bidcheckrepeat.BidCheck(href, uuid) {
 		if !bidcheckrepeat.BidCheck(href, uuid) {
+			res = 1
 			SaveCache = append(SaveCache, tmp)
 			SaveCache = append(SaveCache, tmp)
 			bidcheckrepeat.BidAdd(href, uuid)
 			bidcheckrepeat.BidAdd(href, uuid)
 			if len(SaveCache) > 200 {
 			if len(SaveCache) > 200 {
 				saveMust()
 				saveMust()
 			}
 			}
 		} else {
 		} else {
+			res = 4
 			go log.Println("repeat:", tmp["href"], uuid)
 			go log.Println("repeat:", tmp["href"], uuid)
 		}
 		}
 		lock.Unlock()
 		lock.Unlock()
+	} else {
+		res = 3
+		return
 	}
 	}
 	b = true
 	b = true
 	return
 	return