Эх сурвалжийг харах

监听vps的下载器进程 - 数据检测项目

apple 4 жил өмнө
parent
commit
b0b0d4a5a6

+ 14 - 0
data_monitoring/listen_task/src/config.json

@@ -0,0 +1,14 @@
+{
+  "mongodb": {
+    "addrName": "192.168.3.207:27092",
+    "dbName": "zhengkun",
+    "collName": "baidu_enterprise",
+    "pool": 10
+  },
+  "jp_collname": "baidu_enterprise",
+  "qy_collname": "baidu_enterprise",
+  "jkmail": {
+    "to": "zhengkun@topnet.net.cn",
+    "api": "http://172.17.145.179:19281/_send/_mail"
+  }
+}

+ 58 - 0
data_monitoring/listen_task/src/dataTaskJP.go

@@ -0,0 +1,58 @@
+package main
+
+import (
+	"log"
+	qu "qfw/util"
+	"time"
+)
+
+
+func dealWithJPData()  {
+
+	log.Println("开始竞品数据统计...")
+	defer qu.Catch()
+
+	now:=time.Now() //当前天
+	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
+	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	q := map[string]interface{}{
+		"down_time": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+	log.Println("查询条件",q)
+
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+
+	//细节才需要遍历
+	it := sess.DB(mgo.DbName).C(jp_collname).Find(&q).Iter()
+	total,start :=0, int(time.Now().Unix())
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%10000==0 {
+			log.Println("current index",total)
+		}
+		//删选条件
+
+		tmp = make(map[string]interface{})
+	}
+	log.Println("jp is over:",total,"总用时:",int(time.Now().Unix())-start,"秒")
+
+
+	//是否告警条件
+	if total<1 {
+		sendErrMailApi("竞品数据异常","数量无")
+	}
+	comeintime:=qu.Int64All(time.Now().Unix())
+	date := qu.FormatDateByInt64(&comeintime, qu.Date_yyyyMMdd)
+	mgo.Save("monitor_site", map[string]interface{}{
+		"name":"竞品",
+		"type":"竞品",
+		"num":qu.IntAll(total),
+		"comeintime":comeintime,
+		"updatedate":date,
+		"data":"",
+	})
+
+}

+ 58 - 0
data_monitoring/listen_task/src/dataTaskQY.go

@@ -0,0 +1,58 @@
+package main
+
+import (
+	"log"
+	qu "qfw/util"
+	"time"
+)
+
+
+func dealWithQYData()  {
+
+	log.Println("开始企业数据统计...")
+	defer qu.Catch()
+
+	now:=time.Now() //当前天
+	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
+	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	q := map[string]interface{}{
+		"down_time": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+	log.Println("查询条件",q)
+
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+
+	//细节才需要遍历
+	it := sess.DB(mgo.DbName).C(qy_collname).Find(&q).Iter()
+	total,start :=0, int(time.Now().Unix())
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%10000==0 {
+			log.Println("current index",total)
+		}
+		//删选条件
+
+		tmp = make(map[string]interface{})
+	}
+	log.Println("qy is over:",total,"总用时:",int(time.Now().Unix())-start,"秒")
+
+
+	//是否告警条件
+	if total<1 {
+		sendErrMailApi("企业数据异常","数量无")
+	}
+	comeintime:=qu.Int64All(time.Now().Unix())
+	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
+	mgo.Save("monitor_site", map[string]interface{}{
+		"name":"企业变更",
+		"type":"企业变更",
+		"num":qu.IntAll(total),
+		"comeintime":comeintime,
+		"updatedate":date,
+		"data":"",
+	})
+
+}

+ 62 - 0
data_monitoring/listen_task/src/main.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"github.com/cron"
+	"log"
+	qu "qfw/util"
+	"time"
+)
+
+
+var (
+	sysconfig    	map[string]interface{} //配置文件
+	mgo          	*MongodbSim            //mongodb操作对象
+	jp_collname		string
+	qy_collname		string
+)
+
+func initMgo()  {
+	mconf := sysconfig["mongodb"].(map[string]interface{})
+	log.Println(mconf)
+	mgo = &MongodbSim{
+		MongodbAddr: mconf["addrName"].(string),
+		DbName:      mconf["dbName"].(string),
+		Size:        qu.IntAllDef(mconf["pool"], 10),
+	}
+	mgo.InitPool()
+
+
+	//属性赋值
+	jp_collname = qu.ObjToString(sysconfig["jp_collname"])
+	qy_collname = qu.ObjToString(sysconfig["qy_collname"])
+}
+
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+}
+
+func main() {
+	go taskTime()
+	time.Sleep(99999 * time.Hour)
+}
+
+func taskTime()  {
+
+	log.Println("部署定时任务")
+	c := cron.New()
+	//竞品
+	c.AddFunc("0 30 8 ? * *", func() { dealWithJPData() })
+	//企业变更
+	c.AddFunc("0 30 8 ? * *", func() { dealWithQYData() })
+
+	c.Start()
+
+
+	dealWithJPData()
+
+}
+
+

+ 329 - 0
data_monitoring/listen_task/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 30 - 0
data_monitoring/listen_task/src/sendmail.go

@@ -0,0 +1,30 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+)
+
+
+var tomail string
+var api string
+
+//api模式
+func sendErrMailApi(title,body string)  {
+	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail,api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	}else {
+		log.Println("邮件发送失败:", err)
+	}
+}

+ 0 - 42
data_monitoring/vpn_client/src/main.go

@@ -1,42 +0,0 @@
-package main
-
-import (
-	"flag"
-	"github.com/cron"
-	"log"
-	"net/url"
-	"os"
-	"time"
-)
-
-var (
-	id	string
-)
-
-func init()  {
-	flag.StringVar(&id, "id", "", "唯一标识")
-}
-
-
-func main() {
-	//临时测试-
-	id = "测试标识1"
-
-	if id=="" {
-		log.Println("传参不能为空......")
-		os.Exit(1)
-	}
-	c := cron.New()
-	//备注:打包windows GOOS = windows  GOARCH = 386
-	c.AddFunc("0 */5 * * * ?", func() { task() })
-	c.Start()
-	time.Sleep(99999 * time.Hour)
-}
-
-func task()  {
-	u, _ := url.Parse("http://127.0.0.1:7811")
-	//u, _ := url.Parse("http://monitor.spdata.jianyu360.com")
-	q := u.Query()
-	q.Set("id", id)
-	u.RawQuery = q.Encode()
-}

+ 0 - 119
data_monitoring/vpn_server/src/main.go

@@ -1,119 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"github.com/cron"
-	"log"
-	"net/http"
-	qu "qfw/util"
-	"strings"
-	"time"
-)
-var (
-	sysconfig			map[string]interface{} //配置文件
-	port				string
-	idsArr				[]string
-	dataTmp				map[string]map[string]interface{}
-	during,isErr		int64
-)
-func init()  {
-	//加载配置文件
-	log.Println("加载...")
-	qu.ReadConfig(&sysconfig)
-	port = sysconfig["port"].(string)
-	arr := sysconfig["ids"].([]interface{})
-	idsArr = qu.ObjArrToStringArr(arr)
-	dataTmp = make(map[string]map[string]interface{},0)
-	for _,v := range idsArr{
-		id := qu.ObjToString(v)
-		dataTmp[id] = map[string]interface{}{
-			"isHeart":0, 		//是否接收过
-			"isErrNum":0,		//错误次数
-			"isSendMail" : 0,   //是否发送过
-		}
-	}
-	during = qu.Int64All(sysconfig["during"])
-	isErr = qu.Int64All(sysconfig["isErr"])
-
-	log.Println("准备完毕...")
-}
-
-func main() {
-
-	//http://monitor.spdata.jianyu360.com/,程序端口7811
-	addr := ":"+port
-	http.HandleFunc("/", handler)
-	go http.ListenAndServe(addr, nil)
-
-	//每隔1分钟执行一次:0 */1 * * * ?
-	spec :=fmt.Sprintf("0 */%d * * * ?",during)
-	c := cron.New()
-	c.AddFunc(spec, func() { taskFinishing()})
-	c.Start()
-	time.Sleep(99999 * time.Hour)
-}
-
-func handler(w http.ResponseWriter, r *http.Request) {
-	r.ParseForm() //解析参数,默认是不会解析的
-	if r.Method == "GET" {
-		id := ""
-		for _, v := range r.Form {
-			id = strings.Join(v, "")
-		}
-		if id!="" { //改变-就状态
-			dataTmp[id] = map[string]interface{}{
-				"isHeart":1,
-				"isErrNum":0,
-				"isSendMail":0,
-			}
-		}
-	} else if r.Method == "POST" {
-
-	} else {
-
-	}
-}
-
-//不断监听处理
-func taskFinishing()  {
-	log.Println("执行...处理一次...")
-	isMailContent := ""
-	for _ , id := range idsArr {
-		//此标识-是否正常
-		isHeart ,isErrNum , isSendMail := qu.Int64All(dataTmp[id]["isHeart"]),int64(0),qu.Int64All(dataTmp[id]["isSendMail"])
-		if isSendMail == 1 { //送过邮件了
-			continue
-		}
-		if isHeart == 0 {
-			isErrNum = qu.Int64All(dataTmp[id]["isErrNum"])
-			isErrNum ++
-		}
-		//超过一定次数
-		if isErrNum > isErr { //发邮件
-			isErrNum = 0
-			if isMailContent == ""{
-				isMailContent = id
-			}else {
-				isMailContent = isMailContent+","+id
-			}
-			isSendMail = 1
-		}
-
-		dataTmp[id] = map[string]interface{}{
-			"isHeart":0,
-			"isErrNum":isErrNum,
-			"isSendMail":isSendMail,
-		}
-		//log.Println("处理:",dataTmp[id])
-	}
-
-	if isMailContent!=""{
-		log.Println("发邮件... ...",isMailContent)
-
-		//任选其一皆可
-		//sendErrMailSmtp("vpn异常",isMailContent)
-		sendErrMailApi("vpn异常",isMailContent)
-		//os.Exit(1)
-	}
-
-}

+ 7 - 0
data_monitoring/vps_client/src/config.json

@@ -0,0 +1,7 @@
+{
+  "vpsID": "id标识1111",
+  "processArr": [
+    "d1.exe",
+    "d2.exe"
+  ]
+}

+ 153 - 0
data_monitoring/vps_client/src/main.go

@@ -0,0 +1,153 @@
+package main
+
+import (
+	"github.com/cron"
+	"log"
+	"net/http"
+	"net/url"
+	"os"
+	"os/exec"
+	qu "qfw/util"
+	"runtime"
+	"strconv"
+	"strings"
+	"time"
+)
+
+var (
+	sysconfig			map[string]interface{} //配置文件
+	vpsID	string			//机器唯一标识
+	processArr []string		//机器相关下载器
+
+)
+
+func init()  {
+	log.Println("加载...")
+	qu.ReadConfig(&sysconfig)
+	vpsID = qu.ObjToString(sysconfig["vpsID"])
+	processArr = qu.ObjArrToStringArr(sysconfig["processArr"].([]interface{}))
+}
+
+
+func main() {
+	//临时测试-
+	if vpsID=="" || len(processArr)< 1 {
+		log.Println("配置文件异常,请检查......")
+		os.Exit(1)
+	}
+
+
+	//定时器
+	c := cron.New()
+	//c.AddFunc("0 */5 * * * ?", func() { task() })
+	c.AddFunc("*/10 * * * * ?", func() { task() })
+	c.Start()
+	time.Sleep(99999 * time.Hour)
+}
+
+func task()  {
+
+	//先检测下载器
+	process := "0" //正常 - windows模式
+	for _,v:=range processArr {
+		if runtime.GOOS == "windows" {
+			b,_,_ := isProcessExist(v)
+			if !b {
+				process = "1"
+				break
+			}
+		}else {
+			b,_:=checkProRunning(v)
+			if !b {
+				process = "1"
+				break
+			}
+		}
+	}
+	log.Println("当前下载器:",process)
+	u, _ := url.Parse("http://127.0.0.1:7811") //本地测试
+	//u, _ := url.Parse("http://monitor.spdata.jianyu360.com") //线上
+	q := u.Query()
+	q.Set("id", vpsID)
+	q.Set("process", process)
+	u.RawQuery = q.Encode()
+
+	_, err := http.Get(u.String());
+	if err != nil {
+		log.Println("异常",err)
+	}
+}
+
+
+
+
+
+
+/****************分割线******************/
+/****************分割线******************/
+/****************分割线******************/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+func isProcessExist(appName string) (bool, string, int) {
+	appary := make(map[string]int)
+	cmd := exec.Command("cmd", "/C", "tasklist")
+	output, _ := cmd.Output()
+	//fmt.Printf("fields: %v\n", output)
+	n := strings.Index(string(output), "System")
+	if n == -1 {
+		//fmt.Println("no find")
+		//os.Exit(1)
+	}
+	data := string(output)[n:]
+	fields := strings.Fields(data)
+	for k, v := range fields {
+		if v == appName {
+			appary[appName], _ = strconv.Atoi(fields[k+1])
+
+			return true, appName, appary[appName]
+		}
+	}
+
+	return false, appName, -1
+}
+
+
+
+
+
+
+//根据进程名判断进程是否运行
+func checkProRunning(serverName string) (bool, error) {
+	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
+	pid, err := runInLinux(cmd)
+	if err != nil {
+		return false, err
+	}
+	return pid != "", nil
+}
+//执行linux进程信息
+func runInLinux(cmd string) (string, error) {
+	result, err := exec.Command("/bin/sh", "-c", cmd).Output()
+	if err != nil {
+		return "", err
+	}
+	return strings.TrimSpace(string(result)), err
+}
+//根据进程名称获取进程ID
+func getPid(serverName string) (string, error) {
+	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
+	pid, err := runInLinux(cmd)
+	return pid , err
+}

+ 4 - 5
data_monitoring/vpn_server/src/config.json → data_monitoring/vps_server/src/config.json

@@ -1,11 +1,10 @@
 {
   "port": "7811",
-  "ids" : [
-    "id标识1111",
-    "id标识2222"
+  "vpsIDs" : [
+    "id标识1111"
   ],
-  "during": 1,
-  "isErr" : 0,
+  "during": 10,
+  "isErr" : 5,
   "smtpMail": {
     "from": "zhengkun@topnet.net.cn",
     "to": "zhengkun@topnet.net.cn",

+ 154 - 0
data_monitoring/vps_server/src/main.go

@@ -0,0 +1,154 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"log"
+	"net/http"
+	qu "qfw/util"
+	"strings"
+	"time"
+)
+var (
+	sysconfig			map[string]interface{} //配置文件
+	port				string
+	idsArr				[]string
+	dataTmp				map[string]map[string]interface{}
+	during,isErr		int64
+)
+func init()  {
+	//加载配置文件
+	log.Println("加载...")
+	qu.ReadConfig(&sysconfig)
+	port = sysconfig["port"].(string)
+	arr := sysconfig["vpsIDs"].([]interface{})
+	idsArr = qu.ObjArrToStringArr(arr)
+	dataTmp = make(map[string]map[string]interface{},0)
+	for _,v := range idsArr{
+		id := qu.ObjToString(v)
+		dataTmp[id] = map[string]interface{}{
+			"isHeart":0,
+			"isErrNum":0,
+			"isProcess" : 0,
+			"isVpsMail":0,
+			"isPrpMail":0,
+		}
+	}
+	during = qu.Int64All(sysconfig["during"])
+	isErr = qu.Int64All(sysconfig["isErr"])
+	log.Println("准备完毕...")
+}
+
+func main() {
+
+	//http://monitor.spdata.jianyu360.com/,程序端口7811
+	addr := ":"+port
+	http.HandleFunc("/", handler)
+	go http.ListenAndServe(addr, nil)
+
+	//每隔1分钟执行一次:0 */1 * * * ?   每隔5秒执行一次:*/5 * * * * ?
+
+	//spec :=fmt.Sprintf("0 */%d * * * ?",during)
+	spec :=fmt.Sprintf("*/%d * * * * ?",during)
+	c := cron.New()
+	c.AddFunc(spec, func() { taskFinishing()})
+	c.Start()
+	time.Sleep(99999 * time.Hour)
+}
+
+func handler(w http.ResponseWriter, r *http.Request) {
+	r.ParseForm() //解析参数,默认是不会解析的
+	if r.Method == "GET" {
+		vpsid ,process,isProMail:= "",int64(0),int64(0)
+		for k, v := range r.Form {
+			if k=="id" {
+				vpsid = strings.Join(v, "")
+			}else if k=="process" {
+				process = qu.Int64All(strings.Join(v, ""))
+				if process==0 {
+					isProMail = 0
+				}
+			}else {
+
+			}
+		}
+		if vpsid!="" { //改变-旧状态
+			dataTmp[vpsid] = map[string]interface{}{
+				"isHeart":1,
+				"isErrNum":0,
+				"isVpsMail":0,   //收到心跳-vps邮件置为0,可以发
+				"isProcess":process,
+				"isProMail":isProMail,
+			}
+		}
+		log.Println("接收Get请求:",dataTmp[vpsid])
+
+	} else if r.Method == "POST" {
+
+	} else {
+
+	}
+}
+
+//不断监听处理
+func taskFinishing()  {
+	log.Println("执行...处理一次...")
+	isVpsMailContent,isProMailContent:= "",""
+	for _ , vpsid := range idsArr {
+		//此标识-是否正常
+		log.Println("原:",dataTmp[vpsid])
+		isHeart,isProcess:= qu.Int64All(dataTmp[vpsid]["isHeart"]),qu.Int64All(dataTmp[vpsid]["isProcess"])
+		isErrNum := int64(0)
+		isVpsMail,isProMail := qu.Int64All(dataTmp[vpsid]["isVpsMail"]),qu.Int64All(dataTmp[vpsid]["isProMail"])
+		if isVpsMail == 1 { //送过邮件了
+			//log.Println("发过vps邮件","心跳:",isHeart,"次数:",isErrNum,"下载器:",isProcess,"vps邮件:",isVpsMail,"pro邮件:",isProMail)
+			continue
+		}
+		if isHeart == 0 { //未接收心跳反应,错误+1
+			isErrNum = qu.Int64All(dataTmp[vpsid]["isErrNum"])
+			isErrNum ++
+		}
+		if isErrNum > isErr { //错误超过一定次数,发邮件vps异常
+			isErrNum = 0
+			if isVpsMailContent == ""{
+				isVpsMailContent = vpsid
+			}else {
+				isVpsMailContent = isVpsMailContent+","+vpsid
+			}
+			isVpsMail = 1
+		}
+
+		if isProcess == 1  && isProMail==0 {//下载器异常-未发送过下载器异常情况
+			if isProMailContent == ""{
+				isProMailContent = vpsid
+			}else {
+				isProMailContent = isProMailContent+","+vpsid
+			}
+			isProMail = 1
+			isProcess = 0
+		}
+
+		log.Println("处理后:","心跳:",0,"次数:",isErrNum,"下载器:",isProcess,"vps邮件:",isVpsMail,"pro邮件:",isProMail)
+
+
+		dataTmp[vpsid] = map[string]interface{}{
+			"isHeart":0,
+			"isErrNum":isErrNum,
+			"isProcess":isProcess,
+			"isVpsMail":isVpsMail,
+			"isProMail" : isProMail,
+		}
+		//log.Println("处理:",dataTmp[id])
+	}
+
+
+	if isVpsMailContent!=""{
+		log.Println("发邮件vps异常...",isVpsMailContent)
+		//sendErrMailApi("vpn异常",isVpsMailContent)
+	}else {
+		if isProMailContent !="" {
+			log.Println("发邮件下载器异常...",isVpsMailContent)
+			//sendErrMailApi("下载器异常",isProMailContent)
+		}
+	}
+}

+ 13 - 0
data_monitoring/vpn_server/src/mgo.go → data_monitoring/vps_server/src/mgo.go

@@ -125,6 +125,8 @@ type MongodbSim struct {
 	Ctx      context.Context
 	ShortCtx context.Context
 	pool     chan bool
+	UserName string
+	Password string
 }
 
 func (m *MongodbSim) GetMgoConn() *MgoSess {
@@ -146,6 +148,17 @@ func (m *MongodbSim) InitPool() {
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
 	opts.SetMaxPoolSize(uint64(m.Size))
 	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
 	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)

+ 1 - 1
data_monitoring/vpn_server/src/sendmail.go → data_monitoring/vps_server/src/sendmail.go

@@ -51,7 +51,7 @@ func sendErrMailApi(title,body string)  {
 		api, _ = jkmail["api"].(string)
 	}
 	log.Println(tomail,api)
-	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "title", "body"))
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
 	if err == nil {
 		defer res.Body.Close()
 		read, err := ioutil.ReadAll(res.Body)

+ 13 - 0
data_quality/src/mgo.go

@@ -125,6 +125,8 @@ type MongodbSim struct {
 	Ctx      context.Context
 	ShortCtx context.Context
 	pool     chan bool
+	UserName string
+	Password string
 }
 
 func (m *MongodbSim) GetMgoConn() *MgoSess {
@@ -146,6 +148,17 @@ func (m *MongodbSim) InitPool() {
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
 	opts.SetMaxPoolSize(uint64(m.Size))
 	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
 	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)