apple 4 年之前
父節點
當前提交
e67aac6a8a

+ 10 - 1
data_monitoring/listen_task/src/config.json

@@ -3,10 +3,19 @@
     "addrName": "192.168.3.207:27092",
     "dbName": "zhengkun",
     "collName": "baidu_enterprise",
-    "pool": 10
+    "pool": 5
+  },
+  "stmongodb": {
+    "addrName": "192.168.3.207:27092",
+    "dbName": "zhengkun",
+    "collName": "result_20210109",
+    "pool": 5
   },
   "jp_collname": "baidu_enterprise",
   "qy_collname": "baidu_enterprise",
+  "st_collname": "result_20210109",
+  "st_es_index": "bidding",
+  "st_es_type": "bidding",
   "jkmail": {
     "to": "zhengkun@topnet.net.cn",
     "api": "http://172.17.145.179:19281/_send/_mail"

+ 6 - 7
data_monitoring/listen_task/src/dataTaskJP.go

@@ -9,7 +9,7 @@ import (
 
 func dealWithJPData()  {
 
-	log.Println("开始竞品数据统计...")
+	log.Println("开始统计竞品数据...")
 	defer qu.Catch()
 
 	now:=time.Now() //当前天
@@ -23,11 +23,11 @@ func dealWithJPData()  {
 	}
 	log.Println("查询条件",q)
 
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := datamgo.GetMgoConn()
+	defer datamgo.DestoryMongoConn(sess)
 
 	//细节才需要遍历
-	it := sess.DB(mgo.DbName).C(jp_collname).Find(&q).Iter()
+	it := sess.DB(datamgo.DbName).C(jp_collname).Find(&q).Iter()
 	total,start :=0, int(time.Now().Unix())
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%10000==0 {
@@ -46,13 +46,12 @@ func dealWithJPData()  {
 	}
 	comeintime:=qu.Int64All(time.Now().Unix())
 	date := qu.FormatDateByInt64(&comeintime, qu.Date_yyyyMMdd)
-	mgo.Save("monitor_site", map[string]interface{}{
+	datamgo.Save("monitor_other", map[string]interface{}{
 		"name":"竞品",
-		"type":"竞品",
 		"num":qu.IntAll(total),
 		"comeintime":comeintime,
 		"updatedate":date,
-		"data":"",
+		"data":map[string]interface{}{},
 	})
 
 }

+ 11 - 7
data_monitoring/listen_task/src/dataTaskQY.go

@@ -9,12 +9,17 @@ import (
 
 func dealWithQYData()  {
 
-	log.Println("开始企业数据统计...")
+	log.Println("开始统计企业数据...")
 	defer qu.Catch()
 
 	now:=time.Now() //当前天
 	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
 	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+
+	//临时测试
+	gte_time = 1617811200
+	lt_time = 1617897600
+
 	q := map[string]interface{}{
 		"down_time": map[string]interface{}{
 			"$gte":  gte_time,
@@ -23,11 +28,11 @@ func dealWithQYData()  {
 	}
 	log.Println("查询条件",q)
 
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := datamgo.GetMgoConn()
+	defer datamgo.DestoryMongoConn(sess)
 
 	//细节才需要遍历
-	it := sess.DB(mgo.DbName).C(qy_collname).Find(&q).Iter()
+	it := sess.DB(datamgo.DbName).C(qy_collname).Find(&q).Iter()
 	total,start :=0, int(time.Now().Unix())
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%10000==0 {
@@ -46,13 +51,12 @@ func dealWithQYData()  {
 	}
 	comeintime:=qu.Int64All(time.Now().Unix())
 	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
-	mgo.Save("monitor_site", map[string]interface{}{
+	datamgo.Save("monitor_other", map[string]interface{}{
 		"name":"企业变更",
-		"type":"企业变更",
 		"num":qu.IntAll(total),
 		"comeintime":comeintime,
 		"updatedate":date,
-		"data":"",
+		"data":map[string]interface{}{},
 	})
 
 }

+ 167 - 0
data_monitoring/listen_task/src/dataTaskST.go

@@ -0,0 +1,167 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"time"
+	es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
+)
+
+
+func dealWithSTData()  {
+
+	log.Println("开始统计站点数据...")
+	defer qu.Catch()
+
+	now:=time.Now() //当前天
+	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
+	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	//临时测试
+	gte_time = 1618070400
+	lt_time = 1618156800
+	mgodata := toCalculateMgoData(gte_time,lt_time)
+	esdata := toCalculateEsData(fmt.Sprintf("%d",gte_time),fmt.Sprintf("%d",lt_time))
+
+	mgonum,esnum:= qu.IntAll(mgodata["totalnum"]),qu.IntAll(esdata["totalnum"])
+
+	//是否发警告-待定
+
+
+
+	comeintime:=qu.Int64All(time.Now().Unix())
+	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
+	datamgo.Save("monitor_site", map[string]interface{}{
+		"comeintime":comeintime,
+		"updatedate":date,
+		"mgonum":mgonum,
+		"esnum":esnum,
+		"mgodata":mgodata["detail"],
+		"esdata":esdata["detail"],
+	})
+}
+
+
+func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
+
+	if gte_time == "" || lt_time == "" {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+	log.Println("es 查询区间:",gte_time,lt_time)
+
+	//elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
+	elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
+	esclient := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(esclient)
+	if esclient == nil {
+		log.Println("连接池异常")
+	}
+	query :=es_elastic.NewRangeQuery("comeintime").Gte(gte_time).Lt(lt_time)
+	cursor, err := esclient.Scan(st_es_index).Query(es_elastic.NewBoolQuery().Must(query)).
+		Size(200).Do()
+	if err != nil {
+		log.Println("cursor",err)
+	}
+	if cursor.Results == nil {
+		log.Println("results != nil; got nil")
+	}
+	if cursor.Results.Hits == nil {
+		log.Println("expected results.Hits != nil; got nil")
+	}
+	log.Println("es total :",cursor.TotalHits(),"处理分析中......")
+
+	dict:= make(map[string]interface{},0)
+	total,start,isOK :=0, int(time.Now().Unix()),0
+	for {
+		searchResult, err := cursor.Next()
+		if err != nil {
+			if err.Error() == "EOS" {
+				break
+			}else {
+				log.Println("cursor searchResult",err)
+			}
+		}
+		for _, hit := range searchResult.Hits.Hits {
+			tmp := make(map[string]interface{})
+			err := json.Unmarshal(*hit.Source, &tmp)
+			if err != nil {
+				log.Println("json Unmarshal error")
+				continue
+			}
+			total++
+			site := qu.ObjToString(tmp["site"])	//统计站点
+			if dict[site] == nil {
+				dict[site] = 1
+				isOK++
+			}else {
+				num := qu.IntAll(dict[site])+1
+				dict[site] = num
+			}
+		}
+	}
+
+	log.Println("st is es over:",total,isOK,"有效:",len(dict),"用时:",int(time.Now().Unix())-start,"秒")
+
+
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}
+
+func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
+
+	if gte_time == 0 || lt_time == 0 {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+	log.Println("mgo 查询区间",query)
+
+	sess := sitemgo.GetMgoConn()
+	defer sitemgo.DestoryMongoConn(sess)
+
+	dict:= make(map[string]interface{},0)
+	//细节才需要遍历
+	it := sess.DB(sitemgo.DbName).C(st_collname).Find(&query).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total,start,isOK :=0, int(time.Now().Unix()),0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%10000==0 {
+			//log.Println("current index",total,isOK)
+		}
+		//统计站点
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+			isOK++
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
+		}
+
+		tmp = make(map[string]interface{})
+	}
+
+	log.Println("st is mgo over:",total,isOK,"有效:",len(dict),"用时:",int(time.Now().Unix())-start,"秒")
+
+
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}

+ 31 - 7
data_monitoring/listen_task/src/main.go

@@ -5,30 +5,49 @@ import (
 	"log"
 	qu "qfw/util"
 	"time"
+
 )
 
 
 var (
 	sysconfig    	map[string]interface{} //配置文件
-	mgo          	*MongodbSim            //mongodb操作对象
+	datamgo         *MongodbSim            //mongodb操作对象
+	sitemgo         *MongodbSim            //mongodb操作对象
 	jp_collname		string
 	qy_collname		string
+	st_collname		string
+	st_es_index		string
+	st_es_type		string
 )
 
 func initMgo()  {
 	mconf := sysconfig["mongodb"].(map[string]interface{})
-	log.Println(mconf)
-	mgo = &MongodbSim{
+	datamgo = &MongodbSim{
 		MongodbAddr: mconf["addrName"].(string),
 		DbName:      mconf["dbName"].(string),
 		Size:        qu.IntAllDef(mconf["pool"], 10),
 	}
-	mgo.InitPool()
+	datamgo.InitPool()
+
+	sconf := sysconfig["mongodb"].(map[string]interface{})
+	sitemgo = &MongodbSim{
+		MongodbAddr: sconf["addrName"].(string),
+		DbName:      sconf["dbName"].(string),
+		Size:        qu.IntAllDef(sconf["sconf"], 10),
+	}
+	sitemgo.InitPool()
 
 
 	//属性赋值
 	jp_collname = qu.ObjToString(sysconfig["jp_collname"])
 	qy_collname = qu.ObjToString(sysconfig["qy_collname"])
+
+
+	st_collname = qu.ObjToString(sysconfig["st_collname"])
+	st_es_index = qu.ObjToString(sysconfig["st_es_index"])
+	st_es_type = qu.ObjToString(sysconfig["st_es_type"])
+
+
 }
 
 
@@ -44,18 +63,23 @@ func main() {
 }
 
 func taskTime()  {
+	dealWithQYData()
+	return
 
 	log.Println("部署定时任务")
 	c := cron.New()
-	//竞品
+	//竞品-mongo
 	c.AddFunc("0 30 8 ? * *", func() { dealWithJPData() })
-	//企业变更
+
+	//企业变更-mongo
 	c.AddFunc("0 30 8 ? * *", func() { dealWithQYData() })
 
+	//站点相关-es-mongo
+	c.AddFunc("0 30 8 ? * *", func() { dealWithSTData() })
+
 	c.Start()
 
 
-	dealWithJPData()
 
 }
 

+ 2 - 1
data_monitoring/vps_client/src/config.json

@@ -3,5 +3,6 @@
   "processArr": [
     "d1.exe",
     "d2.exe"
-  ]
+  ],
+  "during":5
 }

+ 43 - 42
data_monitoring/vps_client/src/main.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"fmt"
 	"github.com/cron"
 	"log"
 	"net/http"
@@ -16,8 +17,8 @@ import (
 
 var (
 	sysconfig			map[string]interface{} //配置文件
-	vpsID	string			//机器唯一标识
-	processArr []string		//机器相关下载器
+	vpsID,during		string			//机器唯一标识
+	processArr 			[]string		//机器相关下载器
 
 )
 
@@ -25,13 +26,14 @@ func init()  {
 	log.Println("加载...")
 	qu.ReadConfig(&sysconfig)
 	vpsID = qu.ObjToString(sysconfig["vpsID"])
+	during = qu.ObjToString(sysconfig["duringv"])
 	processArr = qu.ObjArrToStringArr(sysconfig["processArr"].([]interface{}))
 }
 
 
 func main() {
-	//临时测试-
-	if vpsID=="" || len(processArr)< 1 {
+
+	if vpsID == "" || len(processArr) < 1 {
 		log.Println("配置文件异常,请检查......")
 		os.Exit(1)
 	}
@@ -39,8 +41,11 @@ func main() {
 
 	//定时器
 	c := cron.New()
-	//c.AddFunc("0 */5 * * * ?", func() { task() })
-	c.AddFunc("*/10 * * * * ?", func() { task() })
+	spec :=fmt.Sprintf("0 */%d * * * ?",during)	//分
+	//spec :=fmt.Sprintf("*/%d * * * * ?",during)		//秒
+
+	c.AddFunc(spec, func() { task() })
+	//c.AddFunc("*/10 * * * * ?", func() { task() })
 	c.Start()
 	time.Sleep(99999 * time.Hour)
 }
@@ -56,7 +61,7 @@ func task()  {
 				process = "1"
 				break
 			}
-		}else {
+		}else { //linux测试使用
 			b,_:=checkProRunning(v)
 			if !b {
 				process = "1"
@@ -65,8 +70,8 @@ func task()  {
 		}
 	}
 	log.Println("当前下载器:",process)
-	u, _ := url.Parse("http://127.0.0.1:7811") //本地测试
-	//u, _ := url.Parse("http://monitor.spdata.jianyu360.com") //线上
+	//u, _ := url.Parse("http://127.0.0.1:7811") //本地测试
+	u, _ := url.Parse("http://monitor.spdata.jianyu360.com") //线上
 	q := u.Query()
 	q.Set("id", vpsID)
 	q.Set("process", process)
@@ -74,7 +79,7 @@ func task()  {
 
 	_, err := http.Get(u.String());
 	if err != nil {
-		log.Println("异常",err)
+		//log.Println("异常",err)
 	}
 }
 
@@ -100,6 +105,32 @@ func task()  {
 
 
 
+
+
+//根据进程名判断进程是否运行
+func checkProRunning(serverName string) (bool, error) {
+	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
+	pid, err := runInLinux(cmd)
+	if err != nil {
+		return false, err
+	}
+	return pid != "", nil
+}
+//根据进程名称获取进程ID
+func getPid(serverName string) (string, error) {
+	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
+	pid, err := runInLinux(cmd)
+	return pid , err
+}
+//
+func runInLinux(cmd string) (string, error) {
+	result, err := exec.Command("/bin/sh", "-c", cmd).Output()
+	if err != nil {
+		return "", err
+	}
+	return strings.TrimSpace(string(result)), err
+}
+
 func isProcessExist(appName string) (bool, string, int) {
 	appary := make(map[string]int)
 	cmd := exec.Command("cmd", "/C", "tasklist")
@@ -107,7 +138,7 @@ func isProcessExist(appName string) (bool, string, int) {
 	//fmt.Printf("fields: %v\n", output)
 	n := strings.Index(string(output), "System")
 	if n == -1 {
-		//fmt.Println("no find")
+		//log.Println("no find")
 		//os.Exit(1)
 	}
 	data := string(output)[n:]
@@ -119,35 +150,5 @@ func isProcessExist(appName string) (bool, string, int) {
 			return true, appName, appary[appName]
 		}
 	}
-
 	return false, appName, -1
-}
-
-
-
-
-
-
-//根据进程名判断进程是否运行
-func checkProRunning(serverName string) (bool, error) {
-	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
-	pid, err := runInLinux(cmd)
-	if err != nil {
-		return false, err
-	}
-	return pid != "", nil
-}
-//执行linux进程信息
-func runInLinux(cmd string) (string, error) {
-	result, err := exec.Command("/bin/sh", "-c", cmd).Output()
-	if err != nil {
-		return "", err
-	}
-	return strings.TrimSpace(string(result)), err
-}
-//根据进程名称获取进程ID
-func getPid(serverName string) (string, error) {
-	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
-	pid, err := runInLinux(cmd)
-	return pid , err
-}
+}

+ 4 - 4
data_monitoring/vps_server/src/main.go

@@ -48,8 +48,8 @@ func main() {
 
 	//每隔1分钟执行一次:0 */1 * * * ?   每隔5秒执行一次:*/5 * * * * ?
 
-	//spec :=fmt.Sprintf("0 */%d * * * ?",during)
-	spec :=fmt.Sprintf("*/%d * * * * ?",during)
+	spec :=fmt.Sprintf("0 */%d * * * ?",during)
+	//spec :=fmt.Sprintf("*/%d * * * * ?",during)
 	c := cron.New()
 	c.AddFunc(spec, func() { taskFinishing()})
 	c.Start()
@@ -76,12 +76,12 @@ func handler(w http.ResponseWriter, r *http.Request) {
 			dataTmp[vpsid] = map[string]interface{}{
 				"isHeart":1,
 				"isErrNum":0,
-				"isVpsMail":0,   //收到心跳-vps邮件置为0,可以发
+				"isVpsMail":0,   //收到心跳-vps邮件置为0,可以发
 				"isProcess":process,
 				"isProMail":isProMail,
 			}
 		}
-		log.Println("接收Get请求:",dataTmp[vpsid])
+		//log.Println("接收Get请求:",dataTmp[vpsid])
 
 	} else if r.Method == "POST" {