Sfoglia il codice sorgente

monitor mgo/es监控

Jianghan 4 anni fa
parent
commit
26f1aca69f

+ 6 - 1
fullproject/src_v1/config.json

@@ -4,7 +4,7 @@
     "statusdays": 15,
 	"mongodbServers": "192.168.3.207:27092",
     "mongodbPoolSize": 10,
-    "mongodbName": "wjh",
+    "mongodbName": "qfw_data",
 	"hints":"publishtime_1",
     "extractColl": "bidding",
     "extractColl1": "bidding",
@@ -23,6 +23,11 @@
         "uname": "dataAnyWrite",
         "upwd": "data@dataAnyWrite"
     },
+    "spider": {
+        "addr": "192.168.3.207:27092",
+        "dbname": "editor",
+        "dbsize": 2
+    },
     "es": {
         "addr": "http://192.168.3.11:9800",
         "index": "projectset",

+ 10 - 3
fullproject/src_v1/init.go

@@ -16,7 +16,7 @@ import (
 
 var (
 	Sysconfig                                      map[string]interface{} //读取配置文件
-	MongoTool, MgoBidding                          *MongodbSim            //mongodb连接
+	MongoTool, MgoBidding, MgoSpider               *MongodbSim            //mongodb连接
 	ExtractColl, ProjectColl, BackupColl, SiteColl string                 //抽取表、项目表、项目快照表、站点表
 	ExtractColl1                                   string
 	Thread                                         int                    //配置项线程数
@@ -64,10 +64,17 @@ func init() {
 		MongodbAddr: bidding["addr"].(string),
 		Size:        util.IntAll(bidding["dbsize"]),
 		DbName:      bidding["dbname"].(string),
-		UserName:    bidding["uname"].(string),
-		Password:    bidding["upwd"].(string),
+		//UserName:    bidding["uname"].(string),
+		//Password:    bidding["upwd"].(string),
 	}
 	MgoBidding.InitPool()
+	spider, _ := Sysconfig["spider"].(map[string]interface{})
+	MgoSpider = &MongodbSim{
+		MongodbAddr: spider["addr"].(string),
+		Size:        util.IntAll(spider["dbsize"]),
+		DbName:      spider["dbname"].(string),
+	}
+	MgoSpider.InitPool()
 
 	ExtractColl = Sysconfig["extractColl"].(string)
 	ExtractColl1 = Sysconfig["extractColl1"].(string)

+ 45 - 2
fullproject/src_v1/load_data.go

@@ -7,8 +7,7 @@ import (
 	"time"
 )
 
-//初始加载数据,默认加载最近6个月的数据
-
+//  初始加载数据,默认加载最近6个月的数据
 func (p *ProjectTask) loadData(starttime int64) {
 	log.Println("load project start..", starttime)
 	p.findLock.Lock()
@@ -161,3 +160,47 @@ func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectInfo) {
 	}
 	tmp.InfoFiled = tmpMap
 }
+
+//  加载spidercode数据,isflow字段
+func (p *ProjectTask) loadSpiderCode() {
+	log.Println("load spider code start..")
+	p.findLock.Lock()
+	defer p.findLock.Unlock()
+	p.mapSpiderLock.Lock()
+	defer p.mapSpiderLock.Unlock()
+	sess := MgoSpider.GetMgoConn()
+	defer MgoSpider.DestoryMongoConn(sess)
+	q := map[string]interface{}{}
+	field := map[string]interface{}{"code": 1, "isflow": 1}
+	it := sess.DB(MgoSpider.DbName).C("luaconfig_back").Find(&q).Select(field).Iter()
+	n := 0
+	pool := make(chan map[string]interface{}, 100)
+	over := make(chan bool)
+	go func() {
+		for {
+			select {
+			case tmp := <-pool:
+				n++
+				code := util.ObjToString(tmp["code"])
+				p.mapSpider[code] = util.IntAll(tmp["isflow"])
+			case <-over:
+				return
+			}
+		}
+	}()
+	for {
+		result := make(map[string]interface{})
+		if it.Next(&result) {
+			go func(res map[string]interface{}) {
+				util.Debug(result)
+				pool <- result
+			}(result)
+		} else {
+			break
+		}
+	}
+	time.Sleep(2 * time.Second)
+	over <- true
+	log.Println("load spider over..", n)
+
+}

+ 8 - 0
fullproject/src_v1/main.go

@@ -83,6 +83,7 @@ func main() {
 			P_QL.loadData(loadStart)
 		}
 	}
+	P_QL.loadSpiderCode()
 	P_QL.loadSite()
 	go checkMapJob()
 	time.Sleep(99999 * time.Hour)
@@ -184,6 +185,13 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					P_QL.pici = time.Now().Unix()
 					P_QL.delInfoPro(mapInfo)
 				}()
+			case "spider": // 爬虫代码code、isflow
+				go func() {
+					defer func() {
+						<-SingleThread
+					}()
+					go P_QL.loadSpiderCode()
+				}()
 			case "history": //历史数据合并,暂时不写
 				go func() {
 					defer func() {

+ 13 - 3
fullproject/src_v1/project.go

@@ -19,8 +19,6 @@ import (
 
 //从对应map中获取对比的项目id
 func (p *ProjectTask) getCompareIds(pn, pc, ptc, pb string) (bpn, bpc, bptc, bpb int, res []*Key, idArr []string, IDArr []*ID) {
-	p.findLock.Lock()
-	defer p.findLock.Unlock()
 	//	p.ConCurrentLock(n1, n2, n3, n4)
 	//	defer p.ConCurrentUnLock(n1, n2, n3, n4)
 	p.wg.Add(1)
@@ -94,6 +92,19 @@ func (p *ProjectTask) getCompareIds(pn, pc, ptc, pb string) (bpn, bpc, bptc, bpb
 }
 
 func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{}) {
+	p.findLock.Lock()
+	defer p.findLock.Unlock()
+	// 3.18 isfow=0数据不参与项目合并
+	code := qu.ObjToString(tmp["spidercode"])
+	p.mapSpiderLock.Lock()
+	isflow := p.mapSpider[code]
+	p.mapSpiderLock.Unlock()
+	if isflow == 0 {
+		id, _ := p.NewProject(tmp, info)
+		qu.Debug("直接新建项目,", "project id", id)
+		return
+	}
+
 	//只有或没有采购单位的无法合并
 	//bpn, bpc, bptc, bpb 是否查找到,并标识位置。-1代表未查找到。
 	//pids 是项目id数组集合
@@ -655,7 +666,6 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 			"$set": set,
 		},
 	}
-	//log.Println(set)
 	return pId.Hex(), &p1
 }
 

+ 4 - 6
fullproject/src_v1/task.go

@@ -71,6 +71,9 @@ type ProjectTask struct {
 	//站点
 	mapSite     map[string]*Site
 	mapSiteLock sync.Mutex
+	//spider isflow
+	mapSpider     map[string]int
+	mapSpiderLock sync.Mutex
 	//bidtype、bidstatus 锁
 	mapBidLock sync.Mutex
 	//更新或新增通道
@@ -113,6 +116,7 @@ func NewPT() *ProjectTask {
 		mapPc:     make(map[string]*Key, 5000000),
 		mapHref:   make(map[string]string, 1500000),
 		mapSite:   make(map[string]*Site, 1000000),
+		mapSpider: make(map[string]int, 1000000),
 		saveSize:  100,
 
 		//saveSign:   make(chan bool, 1),
@@ -316,12 +320,6 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 			"$lte": StringTOBsonId(lteid),
 		},
 	}
-	if q == nil {
-
-	}
-	if q != nil {
-		//生成查询语句执行
-	}
 	p.enter(db, coll, q)
 	if udpInfo["stop"] == nil {
 		for i := 0; i < 5; i++ {

+ 27 - 0
monitor/README.MD

@@ -0,0 +1,27 @@
+- 1.检索库健康状态监控
+- 2.mgo库数据异常监控
+
+
+---
+>
+> mgo监控  
+> 存量统计:对比内容中存储的上次统计的id和数量  
+> 增量统计:当前时间点前一个小时的数量
+> 
+
+---
+> curl -s http://localhost:9800/_cat/health  
+前两个是时间戳,不过多介绍。其余如下:  
+cluster ,集群名称  
+status,集群状态 green代表健康;yellow代表分配了所有主分片,但至少缺少一个副本,此时集群数据仍旧完整;red代表部分主分片不可用,可能已经丢失数据。  
+node.total,代表在线的节点总数量  
+node.data,代表在线的数据节点的数量  
+shards, active_shards 存活的分片数量  
+pri,active_primary_shards 存活的主分片数量 正常情况下 shards的数量是pri的两倍。  
+relo, relocating_shards 迁移中的分片数量,正常情况为 0  
+init, initializing_shards 初始化中的分片数量 正常情况为 0  
+unassign, unassigned_shards 未分配的分片 正常情况为 0  
+pending_tasks,准备中的任务,任务指迁移分片等 正常情况为 0  
+max_task_wait_time,任务最长等待时间(版本差异)  
+active_shards_percent,正常分片百分比 正常情况为(版本差异) 100%  
+> 

+ 25 - 0
monitor/config.json

@@ -0,0 +1,25 @@
+{
+  "tasktime": 1,
+  "count": {
+    "bidding_back_size": 10000,
+    "last_stock_id": "600000000000000000000000",
+    "last_stock_size": 100000,
+    "last_add_size": 0
+  },
+  "bidding": {
+    "mgodb": "192.168.3.207:27092",
+    "dbsize": 12,
+    "dbname": "qfw_data",
+    "uname": "",
+    "upwd": ""
+  },
+  "elastic": {
+    "addr": "http://192.168.3.128:9800",
+    "index": "",
+    "itype": ""
+  },
+  "jkmail": {
+    "to": "wangjianghan@topnet.net.cn",
+    "api": "http://10.171.112.160:19281/_send/_mail"
+  }
+}

+ 56 - 0
monitor/main.go

@@ -0,0 +1,56 @@
+package main
+
+import (
+	"mongodb"
+	"qfw/util"
+	es "qfw/util/elastic"
+)
+
+var (
+	Sysconfig                    map[string]interface{}
+	Mgo                          *mongodb.MongodbSim
+	Es                           *es.Elastic
+	TaskTime					 int
+	Index, Itype				 string
+	BiddingBackSize				 int64		//biding_back	数据量
+	LastStockId					 string		//上次统计存量id
+	LastStockSize				 int64		//上次统计存量数据
+	LastAddSize				 	 int64		//上次统计增量数据
+	api, to						 string
+)
+
+func init() {
+	util.ReadConfig(&Sysconfig)
+	bidding := Sysconfig["bidding"].(map[string]interface{})
+	Mgo = &mongodb.MongodbSim{
+		MongodbAddr: bidding["mgodb"].(string),
+		Size:        util.IntAllDef(bidding["dbsize"], 2),
+		DbName:      bidding["dbname"].(string),
+		UserName:    bidding["uname"].(string),
+		Password:    bidding["upwd"].(string),
+	}
+	Mgo.InitPool()
+	econf := Sysconfig["elastic"].(map[string]interface{})
+	Es = &es.Elastic{
+		S_esurl: econf["addr"].(string),
+		I_size:  util.IntAllDef(econf["pool"], 12),
+	}
+	Es.InitElasticSize()
+	TaskTime = util.IntAll(Sysconfig["tasktime"])
+	Index = util.ObjToString(Sysconfig["index"])
+	Itype = util.ObjToString(Sysconfig["itype"])
+	count := Sysconfig["count"].(map[string]interface{})
+	BiddingBackSize = util.Int64All(count["bidding_back_size"])
+	LastStockId = util.ObjToString(count["last_stock_id"])
+	LastStockSize = util.Int64All(count["last_stock_size"])
+	LastAddSize = util.Int64All(count["last_add_size"])
+	jkmail := Sysconfig["jkmail"].(map[string]interface{})
+	to = util.ObjToString(jkmail["to"])
+	api = util.ObjToString(jkmail["api"])
+}
+
+func main() {
+	go TimeTask()
+	ch := make(chan bool, 1)
+	<-ch
+}

+ 105 - 0
monitor/task.go

@@ -0,0 +1,105 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"go.mongodb.org/mongo-driver/bson"
+	"mongodb"
+	"net/http"
+	"qfw/util"
+	"strconv"
+	"time"
+)
+
+var taskA = 0		// 增量统计跳过
+
+func TimeTask() {
+	//StartTask()
+	c := cron.New()
+	cronstr := "0 0 * * * ?" // 每小时执行一次
+	_ = c.AddFunc(cronstr, func() { StartTask() })
+	c.Start()
+}
+
+func StartTask() {
+	util.Debug("Start Task...")
+	result := MgoCount()
+	result = EsCheck(result)
+	if result != "" {
+		util.Debug(result)
+		SendMail(result)
+	}
+	util.Debug("Task Over...")
+}
+
+// 统计mgo数据
+func MgoCount() string {
+	defer util.Catch()
+	sess := Mgo.GetMgoConn()
+	defer Mgo.DestoryMongoConn(sess)
+	result := ""
+	// biddingback
+	count := Mgo.Count("bidding_back", nil)
+	util.Debug("bidding_back---", BiddingBackSize, count)
+	if int64(count) != BiddingBackSize {
+		result = fmt.Sprintf("bindding_back数据异常,统计结果%d,往常数据量%d", count, BiddingBackSize)
+		result += "<br>"
+	}
+	// bidding
+	st := time.Now().Unix() - 3600
+	currentLastId := fmt.Sprintf("%x0000000000000000", st)
+	//	对比内存中存量id的统计结果
+	q1 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
+	count1 := Mgo.Count("bidding", q1)
+	util.Debug("bidding---", LastStockSize, count1, q1)
+	if int64(count1) != LastStockSize {
+		result = fmt.Sprintf("bindding数据存量统计异常,统计条件%v,统计结果%d,上次统计数据量%d", q1, count1, LastStockSize)
+		result += "<br>"
+	}else {
+		//	无异常,替换成当前时间点(当前时间点往前一个小时的id)存量的统计结果
+		LastStockId = currentLastId
+		q2 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
+		count2 := Mgo.Count("bidding", q2)
+		LastStockSize = int64(count2)
+	}
+	//	增量
+	et := time.Now().Unix()
+	currentId := fmt.Sprintf("%x0000000000000000", et)
+	q3 := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(currentLastId), "$lte": mongodb.StringTOBsonId(currentId)}}
+	count3 := Mgo.Count("bidding", q3)
+	util.Debug("bidding---", count3, q3)
+	if int64(count3) == 0 {
+		if taskA > 1 {
+			result += fmt.Sprintf("bindding数据增量统计异常,统计条件%v,统计结果%d", q3, count3)
+			result += "<br><br>"
+			taskA = 0
+		}else {
+			taskA ++
+		}
+	}
+	return result
+}
+
+func EsCheck(result string) string {
+	client := Es.GetEsConn()
+	defer Es.DestoryEsConn(client)
+	resp, _ := client.ClusterHealth().Do()
+	util.Debug(*resp)
+	if resp.Status != "green" {
+		result += "<br>" + "检索库异常,异常内容:" + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "cluster_name:" + resp.ClusterName + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "status:" + resp.Status + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
+			"&nbsp;&nbsp;&nbsp;&nbsp;" + "unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
+	}
+	return result
+}
+
+func SendMail(report string) {
+	_, _ = http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, "异常报告!", report))
+}