Browse Source

Merge branch 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4.1

* 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract:
  判重修改,监听站点修改-
  站点-新需求-简易搭建
  es mgo日常统计
  判重修改-
Jianghan 4 năm trước cách đây
mục cha
commit
fa0ef1ddce
30 tập tin đã thay đổi với 1124 bổ sung6 xóa
  1. 31 0
      data_monitoring/listen_data/src/config.json
  2. 377 0
      data_monitoring/listen_data/src/main.go
  3. 31 0
      data_monitoring/listen_data/src/mark
  4. 0 0
      data_monitoring/listen_data/src/mgo.go
  5. 61 0
      data_monitoring/listen_data/src/sendmail.go
  6. 120 0
      data_monitoring/listen_data/src/weekday.go
  7. 0 0
      data_monitoring/zk_backup/listen_lan/src/config.json
  8. 0 0
      data_monitoring/zk_backup/listen_lan/src/errmail.go
  9. 0 0
      data_monitoring/zk_backup/listen_lan/src/main.go
  10. 0 0
      data_monitoring/zk_backup/listen_lan/src/mgo.go
  11. 0 0
      data_monitoring/zk_backup/listen_lan/src/sitelisten.go
  12. 0 0
      data_monitoring/zk_backup/listen_online/src/config.json
  13. 0 0
      data_monitoring/zk_backup/listen_online/src/errmail.go
  14. 0 0
      data_monitoring/zk_backup/listen_online/src/main.go
  15. 0 0
      data_monitoring/zk_backup/listen_online/src/mgo.go
  16. 0 0
      data_monitoring/zk_backup/listen_online/src/priselisten.go
  17. 0 0
      data_monitoring/zk_backup/listen_online/src/sitelisten.go
  18. 0 0
      data_monitoring/zk_backup/listen_task/src/config.json
  19. 0 0
      data_monitoring/zk_backup/listen_task/src/dataTaskJP.go
  20. 0 0
      data_monitoring/zk_backup/listen_task/src/dataTaskQY.go
  21. 0 0
      data_monitoring/zk_backup/listen_task/src/dataTaskST.go
  22. 0 0
      data_monitoring/zk_backup/listen_task/src/main.go
  23. 0 0
      data_monitoring/zk_backup/listen_task/src/mark
  24. 329 0
      data_monitoring/zk_backup/listen_task/src/mgo.go
  25. 0 0
      data_monitoring/zk_backup/listen_task/src/sendmail.go
  26. 41 0
      esmgocount/src/config.json
  27. 124 0
      esmgocount/src/main.go
  28. 7 0
      udpfilterdup/src/dataMethod.go
  29. 2 5
      udpfilterdup/src/main.go
  30. 1 1
      udpfilterdup/src/updateMethod.go

+ 31 - 0
data_monitoring/listen_data/src/config.json

@@ -0,0 +1,31 @@
+{
+  "save_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "editor",
+    "coll": "monitor_site",
+    "pool": 5
+  },
+  "site_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "spider",
+    "coll": "data_bak",
+    "pool": 5
+  },
+  "python_mgodb": {
+    "addr": "172.17.4.187:27082,172.17.145.163:27083",
+    "db": "qfw",
+    "coll": "bidding",
+    "pool": 5
+  },
+  "site_unique_name":"site_unique_new",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhengkun@topnet.net.cn,chenjiakang@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "lktqxssmdkebbcbj"
+  },
+  "xlsx_name" : "site_data.xlsx"
+}

+ 377 - 0
data_monitoring/listen_data/src/main.go

@@ -0,0 +1,377 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"github.com/tealeg/xlsx"
+	"github.com/xuri/excelize"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"log"
+	"os"
+	qu "qfw/util"
+	"time"
+)
+
+var (
+	sysconfig    							map[string]interface{} //配置文件
+	save_mgo,site_mgo,python_mgo        	*MongodbSim            //mongodb操作对象
+	save_c_name,site_c_name,python_c_name,xlsx_name,site_unique_name	string
+	prepareData								[]map[string]interface{}
+)
+
+func initMgo()  {
+
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	save_c_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+	siteconf := sysconfig["site_mgodb"].(map[string]interface{})
+	site_c_name = qu.ObjToString(siteconf["coll"])
+	site_mgo = &MongodbSim{
+		MongodbAddr: siteconf["addr"].(string),
+		DbName:      siteconf["db"].(string),
+		Size:        qu.IntAllDef(siteconf["pool"], 5),
+	}
+	site_mgo.InitPool()
+
+
+	pconf := sysconfig["python_mgodb"].(map[string]interface{})
+	python_c_name = qu.ObjToString(pconf["coll"])
+	python_mgo = &MongodbSim{
+		MongodbAddr: pconf["addr"].(string),
+		DbName:      pconf["db"].(string),
+		Size:        qu.IntAllDef(pconf["pool"], 5),
+		//Password:	 "zk@123123",
+		//UserName:	 "zhengkun",
+		Password:	 "datazy@read",
+		UserName:	 "dataZY",
+	}
+	python_mgo.InitPool()
+
+
+	site_unique_name = qu.ObjToString(sysconfig["site_unique_name"])
+	xlsx_name = qu.ObjToString(sysconfig["xlsx_name"])
+
+
+	log.Println(site_unique_name,xlsx_name)
+
+}
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+}
+
+
+func main()  {
+
+	c := cron.New()
+	c.AddFunc("0 50 8 ? * *", func() { dealWithSiteData() })
+	c.Start()
+	log.Println("测试立即执行")
+	dealWithSiteData()
+}
+
+func dealWithSiteData()  {
+
+	prepareXlsxSiteData()//准备数据
+
+	if prepareData==nil || len(prepareData)==0{
+		log.Println("异常:无数据")
+		return
+	}
+
+	now:=time.Now()
+	durdays:=7 //周期7天 假设今天5月15日   周六 now.Day()
+	start:= time.Date(now.Year(), now.Month(), now.Day()-durdays, 0, 0, 0, 0, time.Local).Unix()
+	end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+
+	log.Println(start,end)
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  start,
+			"$lt": end,
+		},
+	}
+	//构建时间模型模型数据
+	sitedata ,pythondata:= make(map[string]map[string]int,0),make(map[string]map[string]map[string]interface{},0)
+	siteTimeArr := []string{}
+	for i:=0;i<durdays ; i++ {
+		t := int64(i*86400)+start
+		time_key :=TimeStampToString(t)
+		time_day := GetOneWeekDay(time_key)
+		if time_day==7||time_day==6 {}else {
+			sitedata[time_key] = map[string]int{}
+			pythondata[time_key] = map[string]map[string]interface{}{}
+			siteTimeArr = append(siteTimeArr,time_key)
+		}
+	}
+	sess_site := site_mgo.GetMgoConn()
+	defer site_mgo.DestoryMongoConn(sess_site)
+	log.Println("data_bak 查询条件:",q)
+	it_site := sess_site.DB(site_mgo.DbName).C(site_c_name).Find(&q).Sort("comeintime").Select(map[string]interface{}{
+		"comeintime":        1,
+		"site":1,
+	}).Iter()
+	total:= 0
+	for tmp := make(map[string]interface{}); it_site.Next(&tmp); total++ {
+		if total%10000 == 0 {
+			log.Println("current:", total)
+		}
+
+		site:=qu.ObjToString(tmp["site"])
+		comeintime:=qu.Int64All(tmp["comeintime"])
+		time_key :=TimeStampToString(comeintime)
+		time_day := GetOneWeekDay(time_key)
+		if time_day==6 ||time_day==7 {}else {
+			timedata := sitedata[time_key]
+			if qu.Int64All(timedata[site])==0 {
+				timedata[site] =1
+			}else {
+				num :=timedata[site]
+				timedata[site] = num+1
+			}
+		}
+		tmp = make(map[string]interface{})
+	}
+
+	log.Println("is site over :",total)
+
+
+
+	sess_python := python_mgo.GetMgoConn()
+	defer python_mgo.DestoryMongoConn(sess_python)
+	log.Println("bidding 查询条件:",q)
+	it_python := sess_python.DB(python_mgo.DbName).C(python_c_name).Find(&q).Sort("comeintime").Select(map[string]interface{}{
+		"comeintime":        1,
+		"site":1,
+		"spidercode":1,
+	}).Iter()
+	total= 0
+	for tmp := make(map[string]interface{}); it_python.Next(&tmp); total++ {
+		if total%10000 == 0 {
+			log.Println("current:", total)
+		}
+		site:=qu.ObjToString(tmp["site"])
+		spidercode:=qu.ObjToString(tmp["spidercode"])
+		comeintime:=qu.Int64All(tmp["comeintime"])
+
+		time_key :=TimeStampToString(comeintime)
+		time_day := GetOneWeekDay(time_key)
+		if time_day==6 ||time_day==7 {}else {
+			timedata := pythondata[time_key]
+			if timedata[site]==nil {
+				timedata[site] = map[string]interface{}{
+					spidercode:1,
+				}
+			}else {
+				dict :=timedata[site]
+				num := qu.IntAll(dict[spidercode])
+				timedata[site][spidercode] = num+1
+			}
+		}
+
+		tmp = make(map[string]interface{})
+	}
+
+	log.Println("is python over :",total)
+
+	os.Remove(xlsx_name)
+	f :=xlsx.NewFile()
+
+	for _,tmp:=range prepareData {
+		type_name := qu.ObjToString(tmp["type"])
+		data, _ := tmp["data"].(primitive.A)
+		dataArr := qu.ObjArrToMapArr(data)
+		sheet, _ := f.AddSheet(type_name)
+		row := sheet.AddRow()
+		row.AddCell().Value = "国家/省份"
+		row.AddCell().Value = "站点名称"
+		if type_name == "python" {
+			row.AddCell().Value = "爬虫名称"
+		}
+		row.AddCell().Value = "负责人"
+		for _, timekey := range siteTimeArr {
+			row.AddCell().SetString(fmt.Sprintf("%s入库量", timekey))
+		}
+		row.AddCell().Value = "前五天总量"
+
+		for _, dict := range dataArr {
+			row = sheet.AddRow()
+			row.AddCell().SetString(qu.ObjToString(dict["area"]))
+			row.AddCell().SetString(qu.ObjToString(dict["site"]))
+			if type_name == "python" {
+				row.AddCell().SetString(qu.ObjToString(dict["spidercode"]))
+			}
+			row.AddCell().SetString(qu.ObjToString(dict["person"]))
+			total_num := 0
+			if type_name == "python" {
+				for _,timekey:=range siteTimeArr{
+					site_key := qu.ObjToString(dict["site"])
+					spider_key := qu.ObjToString(dict["spidercode"])
+					num := qu.IntAll(pythondata[timekey][site_key][spider_key])
+					total_num = total_num+num
+					row.AddCell().SetString(fmt.Sprintf("%d",num))
+				}
+				row.AddCell().SetString(fmt.Sprintf("%d",total_num))
+			}else {
+				for _,timekey:=range siteTimeArr{
+					key := qu.ObjToString(dict["site"])
+					num := qu.IntAll(sitedata[timekey][key])
+					total_num = total_num+num
+					row.AddCell().SetString(fmt.Sprintf("%d",num))
+				}
+				row.AddCell().SetString(fmt.Sprintf("%d",total_num))
+			}
+
+
+		}
+
+	}
+
+	err := f.Save(xlsx_name)
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+		return
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+
+	//是否存日志- 待定
+
+
+	time.Sleep(5*time.Second)
+
+	//发送邮件
+	sendErrMailSmtp("主要站点最近五个工作日相关统计","附件")
+
+	log.Println("结束......")
+
+	//
+
+}
+
+//准备模板站点数据
+func prepareXlsxSiteData()  {
+
+	prepareData = make([]map[string]interface{},0)
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	q:=map[string]interface{}{}
+	it := sess.DB(save_mgo.DbName).C(site_unique_name).Find(&q).Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp);{
+		dict := tmp
+		delete(dict,"_id")
+		prepareData = append(prepareData,dict)
+		tmp = make(map[string]interface{})
+	}
+	log.Println("准备完毕... ...")
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+//准备站点模型数据
+func prepareTableData()  {
+
+	log.Println("准备数据...")
+
+	f, err := excelize.OpenFile("站点跟踪维护新需求.xlsx")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	sheetArr := []string{"Sheet1","Sheet2","Sheet3","Sheet4"}
+	sheetNumArr := []int{38,36,87,16}
+	dataArr := make([][]map[string]interface{},0)
+	for index,sheet:=range sheetArr{
+		log.Println("*************")
+		log.Println("*************")
+		rows := f.GetRows(sheet)
+		arr:=make([]map[string]interface{},0)
+		max :=sheetNumArr[index]
+		for k, row := range rows {
+			if k>0 && k<max {
+				dict := map[string]interface{}{}
+				for k1, value := range row {
+					if index==3 {
+						if k1==0 {
+							dict["area"] = qu.ObjToString(value)
+						}else if k1==1 {
+							dict["site"] = qu.ObjToString(value)
+						}else if k1==2 {
+							dict["spidercode"] = qu.ObjToString(value)
+						}else if k1==3 {
+							dict["person"] = qu.ObjToString(value)
+						}else {
+							break
+						}
+					}else {
+						if k1==0 {
+							dict["area"] = qu.ObjToString(value)
+						}else if k1==1 {
+							dict["site"] = qu.ObjToString(value)
+						}else if k1==2 {
+							dict["person"] = qu.ObjToString(value)
+						}else {
+
+							break
+						}
+					}
+				}
+				arr = append(arr,dict)
+			}
+			if k>max {
+				break
+			}
+		}
+
+		dataArr = append(dataArr,arr)
+	}
+
+	typeArr :=[]string{"政府采购","公共资源","其他站点","python"}
+	for index,arr := range dataArr {
+		type_name := typeArr[index]
+		save_mgo.Save(save_c_name, map[string]interface{}{
+			"type":type_name,
+			"data":arr,
+		})
+	}
+
+	log.Println("is save over")
+
+
+}

+ 31 - 0
data_monitoring/listen_data/src/mark

@@ -0,0 +1,31 @@
+{
+  "save_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "site_unique_new",
+    "pool": 5
+  },
+  "site_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "data_bak_copy",
+    "pool": 5
+  },
+  "python_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "bidding_copy",
+    "pool": 5
+  },
+  "site_unique_name":"site_unique_new",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhaoyujian@topnet.net.cn,fengweiqiang@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "lktqxssmdkebbcbj"
+  },
+  "xlsx_name" : "site_data.xlsx"
+}

+ 0 - 0
data_monitoring/listen_lan/src/mgo.go → data_monitoring/listen_data/src/mgo.go


+ 61 - 0
data_monitoring/listen_data/src/sendmail.go

@@ -0,0 +1,61 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"qfw/util/mail"
+	"os"
+	qu "qfw/util"
+)
+var tomail string
+var api string
+var from,to,cc, smtpHost,user,pwd string
+var smtpPort int
+//api模式 二选一皆可
+func sendErrMailApi(title,body string)  {
+	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail,api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	}else {
+		log.Println("邮件发送失败:", err)
+	}
+}
+
+func sendErrMailSmtp(title,body string) {
+
+	smtpMail, _ := sysconfig["smtpMail"].(map[string]interface{})
+	if smtpMail != nil {
+		from, _ = smtpMail["from"].(string)
+		to, _ = smtpMail["to"].(string)
+		cc, _ = smtpMail["cc"].(string)
+		smtpHost, _ = smtpMail["smtpHost"].(string)
+		smtpPort= qu.IntAll(smtpMail["smtpPort"])
+		user, _ = smtpMail["user"].(string)
+		pwd, _ = smtpMail["pwd"].(string)
+	}
+	f, _ := os.Open(xlsx_name)
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		fmt.Println("err:",err)
+		return
+	}
+
+
+	ok := mail.GSendMail_Bq(from, to, cc, cc, title, body, f.Name(), b, &mail.GmailAuth{
+		SmtpHost: smtpHost,
+		SmtpPort: smtpPort,
+		User:     user,
+		Pwd:      pwd,
+	})
+	fmt.Println(ok)
+}

+ 120 - 0
data_monitoring/listen_data/src/weekday.go

@@ -0,0 +1,120 @@
+package main
+
+import (
+	"log"
+	"time"
+)
+
+var WeekDayMap = map[string]int64{
+	"Monday":    1,
+	"Tuesday":   2,
+	"Wednesday": 3,
+	"Thursday":  4,
+	"Friday":    5,
+	"Saturday":  6,
+	"Sunday":    7,
+}
+
+// 获取输入日期分别是星期几
+func GetOneWeekDay(startime string) (int64) {
+	startday, _ := time.Parse("2006-01-02", startime)
+	staweek_int := startday.Weekday().String()
+	return WeekDayMap[staweek_int]
+}
+
+
+// 获取输入的两个日期分别是星期几
+func GetWeekDay(startime, endtim string) (int64, int64) {
+	startday, _ := time.Parse("2006-01-02", startime)
+	endday, _ := time.Parse("2006-01-02", endtim)
+	staweek_int := startday.Weekday().String()
+	endweek_int := endday.Weekday().String()
+	return WeekDayMap[staweek_int], WeekDayMap[endweek_int]
+}
+
+
+// 字符串转时间戳
+func StringToTimeStamp(strTime string) int64 {
+	timeLayout := "2006-01-02"
+	//timeLayout := "2006-01-02 15:04:05"
+	loc, _ := time.LoadLocation("Local")
+	the_time, err := time.ParseInLocation(timeLayout, strTime, loc)
+	if err != nil {
+		log.Println("StringToTimeStamp出现异常:", err)
+	}
+	unix_time := the_time.Unix()
+	return unix_time
+}
+
+// 时间戳转 字符串
+func TimeStampToString(timeStp int64) string {
+	//转化所需模板
+	//timeLayout := "2006-01-02 15:04:05"
+	timeLayout := "2006-01-02"
+	//进行格式化
+	datetime := time.Unix(timeStp, 0).Format(timeLayout)
+	return datetime
+}
+
+
+// 时间转化为周日期列表
+func ChangeToWeek(startime, endtim string) []map[string]string {
+
+	staweek_int, endweek_int := GetWeekDay(startime, endtim)
+	// 获取时间戳
+	start_stamp := StringToTimeStamp(startime)
+	end_stamp := StringToTimeStamp(endtim)
+	log.Println("start_stamp==",start_stamp,"end_stamp==", end_stamp)
+
+	var week_list = make([]map[string]string, 0)
+	if (end_stamp-start_stamp)/604800 <= 1 && endweek_int-staweek_int >= 0 {
+		if end_stamp-start_stamp < 604800 && endweek_int-staweek_int > 0 {
+			one_map := map[string]string{}
+			mon_one := TimeStampToString(start_stamp - (staweek_int-1)*86400)
+			sun_one := TimeStampToString(start_stamp + (7-staweek_int)*86400)
+			one_map["mon"] = mon_one
+			one_map["sun"] = sun_one
+			week_list = append(week_list, one_map)
+			return week_list
+		}
+		one_map := map[string]string{}
+		mon_one := TimeStampToString(start_stamp - (staweek_int-1)*86400)
+		sun_one := TimeStampToString(start_stamp + (7-staweek_int)*86400)
+		one_map["mon"] = mon_one
+		one_map["sun"] = sun_one
+		week_list = append(week_list, one_map)
+		tow_map := map[string]string{}
+		mon_tow := TimeStampToString(end_stamp - (endweek_int-1)*86400)
+		sun_tow := TimeStampToString(end_stamp + (7-endweek_int)*86400)
+		tow_map["mon"] = mon_tow
+		tow_map["sun"] = sun_tow
+		week_list = append(week_list, tow_map)
+		return week_list
+	}
+	week_n := (end_stamp - start_stamp) / 604800
+	one_map := map[string]string{}
+	mon_one := TimeStampToString(start_stamp - (staweek_int-1)*86400)
+	sun_one := TimeStampToString(start_stamp + (7-staweek_int)*86400)
+	one_map["mon"] = mon_one
+	one_map["sun"] = sun_one
+	week_list = append(week_list, one_map)
+	for i := 1; i <= int(week_n); i++ {
+		week_map := map[string]string{}
+		mon_day := TimeStampToString(start_stamp - (staweek_int-1)*86400 + int64(i)*604800)
+		sun_day := TimeStampToString(start_stamp + (7-staweek_int)*86400 + int64(i)*604800)
+		week_map["mon"] = mon_day
+		week_map["sun"] = sun_day
+		week_list = append(week_list, week_map)
+	}
+	if endweek_int-staweek_int >= 0 {
+		return week_list
+	}
+	tow_map := map[string]string{}
+	mon_tow := TimeStampToString(end_stamp - (endweek_int-1)*86400)
+	sun_tow := TimeStampToString(end_stamp + (7-endweek_int)*86400)
+	tow_map["mon"] = mon_tow
+	tow_map["sun"] = sun_tow
+	week_list = append(week_list, tow_map)
+	return week_list
+
+}

+ 0 - 0
data_monitoring/listen_lan/src/config.json → data_monitoring/zk_backup/listen_lan/src/config.json


+ 0 - 0
data_monitoring/listen_lan/src/errmail.go → data_monitoring/zk_backup/listen_lan/src/errmail.go


+ 0 - 0
data_monitoring/listen_lan/src/main.go → data_monitoring/zk_backup/listen_lan/src/main.go


+ 0 - 0
data_monitoring/listen_online/src/mgo.go → data_monitoring/zk_backup/listen_lan/src/mgo.go


+ 0 - 0
data_monitoring/listen_lan/src/sitelisten.go → data_monitoring/zk_backup/listen_lan/src/sitelisten.go


+ 0 - 0
data_monitoring/listen_online/src/config.json → data_monitoring/zk_backup/listen_online/src/config.json


+ 0 - 0
data_monitoring/listen_online/src/errmail.go → data_monitoring/zk_backup/listen_online/src/errmail.go


+ 0 - 0
data_monitoring/listen_online/src/main.go → data_monitoring/zk_backup/listen_online/src/main.go


+ 0 - 0
data_monitoring/listen_task/src/mgo.go → data_monitoring/zk_backup/listen_online/src/mgo.go


+ 0 - 0
data_monitoring/listen_online/src/priselisten.go → data_monitoring/zk_backup/listen_online/src/priselisten.go


+ 0 - 0
data_monitoring/listen_online/src/sitelisten.go → data_monitoring/zk_backup/listen_online/src/sitelisten.go


+ 0 - 0
data_monitoring/listen_task/src/config.json → data_monitoring/zk_backup/listen_task/src/config.json


+ 0 - 0
data_monitoring/listen_task/src/dataTaskJP.go → data_monitoring/zk_backup/listen_task/src/dataTaskJP.go


+ 0 - 0
data_monitoring/listen_task/src/dataTaskQY.go → data_monitoring/zk_backup/listen_task/src/dataTaskQY.go


+ 0 - 0
data_monitoring/listen_task/src/dataTaskST.go → data_monitoring/zk_backup/listen_task/src/dataTaskST.go


+ 0 - 0
data_monitoring/listen_task/src/main.go → data_monitoring/zk_backup/listen_task/src/main.go


+ 0 - 0
data_monitoring/listen_task/src/mark → data_monitoring/zk_backup/listen_task/src/mark


+ 329 - 0
data_monitoring/zk_backup/listen_task/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 0 - 0
data_monitoring/listen_task/src/sendmail.go → data_monitoring/zk_backup/listen_task/src/sendmail.go


+ 41 - 0
esmgocount/src/config.json

@@ -0,0 +1,41 @@
+{
+    "task": [
+        {
+            "cron": "10 27 8,10,12,14,16,18,20 ? * 2-6",
+            "tjscope": "-2,h",
+            "min": 100,
+            "max": 20000,
+            "type": "alert",
+            "name": "es工作日预警2小时内范围100~2万"
+        },
+        {
+            "cron": "0 18 12,18 ? * 7,1",
+            "tjscope": "-6,h",
+            "min": 50,
+            "max": 100000,
+            "type": "alert",
+            "name": "es周末预警6小时内范围50~10万"
+        },
+        {
+            "cron": "0 56 8 * * ?",
+            "tjscope": "-1,d",
+            "type": "report",
+            "name": "es日报",
+			"mgo":"127.0.0.1:27082|dataAnyWrite|data@dataAnyWrite|jyqyfw_test|usermail"
+        },
+        {
+            "cron": "0 10 9 ? * MON",
+            "tjscope": "-7,d",
+            "type": "report",
+            "name": "es周报"
+        }
+    ],
+	"esAddr":"http://172.17.145.170:9800",
+	"esIndex":"bidding",
+    "esDataAddr":"http://172.17.4.189:9800",
+	"esDataIndex":"bidding",
+	"jkmail": {
+        "to": "wangjianghan@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    }
+}

+ 124 - 0
esmgocount/src/main.go

@@ -0,0 +1,124 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"mongodb"
+	"net/http"
+	"qfw/util"
+	"qfw/util/elastic"
+	"strings"
+	"time"
+
+	"github.com/robfig/cron"
+	"go.mongodb.org/mongo-driver/bson"
+)
+
+//定时任务,去统计bidding索引,排查问题、预警
+
+type T struct {
+	Cron    string
+	Name    string
+	Min     int
+	Max     int
+	Type    string
+	Tjscope string
+	Mgo     string
+}
+
+//es数据最报警
+var (
+	config               map[string]interface{}
+	to                   string
+	api                  string
+	esAddr, esDataAddr   string
+	esIndex, esDataIndex string
+	Ts                   = []*T{}
+	esQ                  = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
+)
+
+func init() {
+	util.ReadConfig(&config)
+	jkmail := config["jkmail"].(map[string]interface{})
+	to, _ = jkmail["to"].(string)
+	api, _ = jkmail["api"].(string)
+	esAddr, _ = config["esAddr"].(string)
+	esIndex, _ = config["esIndex"].(string)
+	esDataAddr, _ = config["esDataAddr"].(string)
+	esDataIndex, _ = config["esDataIndex"].(string)
+	tasks, _ := config["task"].([]interface{})
+	for _, t := range tasks {
+		bs, _ := json.Marshal(t)
+		var v *T
+		json.Unmarshal(bs, &v)
+		if v != nil {
+			Ts = append(Ts, v)
+		}
+	}
+}
+
+func main() {
+	log.Println("start..")
+	if len(Ts) > 0 {
+		c := cron.New()
+		for _, v := range Ts {
+			c.AddFunc(v.Cron, v.task)
+		}
+		c.Start()
+		defer c.Stop()
+		select {}
+	}
+	log.Println("end..")
+
+}
+
+func (t *T) task() {
+	//初始化语句
+	qt := strings.Split(t.Tjscope, ",")
+	if len(qt) != 2 {
+		return
+	}
+	eq := ""
+	st, et := int64(0), int64(0)
+	now := time.Now()
+	switch qt[1] {
+	case "h":
+		et = now.Unix()
+		st = et + util.Int64All(qt[0])*3600
+	case "d":
+		st = util.GetDayStartSecond(util.IntAll(qt[0]))
+		et = util.GetDayStartSecond(0)
+	}
+	st1 := fmt.Sprintf("%x0000000000000000", st)
+	et1 := fmt.Sprintf("%x0000000000000000", et)
+	eq = fmt.Sprintf(esQ, st1, et1)
+	es := elastic.Elastic{S_esurl: esAddr, I_size: 1}
+	es.InitElasticSize()
+	count := int(es.Count(esIndex, esIndex, eq))
+	switch t.Type {
+	case "alert":
+		if count < t.Min || count > t.Max {
+			//report := fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
+			//t.SendMail(report)
+		}
+	case "report":
+		report := fmt.Sprintf("报告%s,统计结果%d", t.Name, count)
+		if len(t.Mgo) > 5 {
+			fs := strings.Split(t.Mgo, "|")
+			fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
+			id1 := mongodb.StringTOBsonId(st1)
+			id2 := mongodb.StringTOBsonId(et1)
+			mq := bson.M{"extracttype": bson.M{"$ne": -1}, "sensitive": bson.M{"$ne": "测试"}, "dataging": bson.M{"$ne": 1}, "_id": bson.M{"$gte": id1, "$lt": id2}}
+			count2 := fmgo.Count(fs[4], mq)
+			count3 := fmgo.Count(fs[4], bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}}) //mgo总入库量
+			report += ",mgo统计" + fmt.Sprint(count2) + ",差值:" + fmt.Sprint(count2-count) + ",mgo总入库量" + fmt.Sprint(count3)
+		}
+		t.SendMail(report)
+	}
+	log.Println("task over:", t.Name, eq, count)
+}
+
+func (t *T) SendMail(report string) {
+	http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report))
+}

+ 7 - 0
udpfilterdup/src/dataMethod.go

@@ -71,6 +71,13 @@ func againRepeat(v *Info, info *Info) bool {
 	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
 		return true
 	}
+	if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+		return true
+	}
+	if v.title != info.title && v.title != "" && info.title != ""{
+		return true
+	}
+
 
 	return false
 }

+ 2 - 5
udpfilterdup/src/main.go

@@ -138,10 +138,7 @@ func init() {
 }
 
 
-func mainT() {
-	//exportFenLeiData()
-	//return
-
+func main() {
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -175,7 +172,7 @@ func mainT() {
 }
 
 //测试组人员使用
-func main() {
+func mainT() {
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)

+ 1 - 1
udpfilterdup/src/updateMethod.go

@@ -45,7 +45,7 @@ func (update *updateInfo) updateData() {
 				tmpArr = make([][]map[string]interface{}, update.saveSize)
 				tmpIndex = 0
 			}
-		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+		case <-time.After(5 * time.Second)://无反应时每x秒检测一次
 			if tmpIndex > 0 {
 				sp <- true
 				go func(dataArr [][]map[string]interface{}) {