Эх сурвалжийг харах

竞品-企业-站点-检测

apple 4 жил өмнө
parent
commit
930946780c

+ 18 - 13
data_monitoring/listen_task/src/config.json

@@ -1,21 +1,26 @@
 {
-  "mongodb": {
-    "addrName": "192.168.3.207:27092",
-    "dbName": "zhengkun",
-    "collName": "baidu_enterprise",
+  "data_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "baidu_enterprise",
     "pool": 5
   },
-  "stmongodb": {
-    "addrName": "192.168.3.207:27092",
-    "dbName": "zhengkun",
-    "collName": "result_20210109",
+  "jp_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "data_bak",
     "pool": 5
   },
-  "jp_collname": "baidu_enterprise",
-  "qy_collname": "baidu_enterprise",
-  "st_collname": "result_20210109",
-  "st_es_index": "bidding",
-  "st_es_type": "bidding",
+  "st_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "result_20210109",
+    "pool": 5
+  },
+  "es_index_st": "bidding",
+  "es_type_st": "bidding",
+  "save_other_name": "",
+  "save_site_name" : "",
   "jkmail": {
     "to": "zhengkun@topnet.net.cn",
     "api": "http://172.17.145.179:19281/_send/_mail"

+ 21 - 15
data_monitoring/listen_task/src/dataTaskJP.go

@@ -12,46 +12,52 @@ func dealWithJPData()  {
 	log.Println("开始统计竞品数据...")
 	defer qu.Catch()
 
+	start := int(time.Now().Unix())
 	now:=time.Now() //当前天
 	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
 	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+
+	//临时测试-7号
+	gte_time = 1617724800
+	lt_time = 1617811200
+
 	q := map[string]interface{}{
-		"down_time": map[string]interface{}{
+		"comeintime": map[string]interface{}{
 			"$gte":  gte_time,
 			"$lt": lt_time,
 		},
+		"site":"中国招标与采购网",
 	}
-	log.Println("查询条件",q)
 
-	sess := datamgo.GetMgoConn()
-	defer datamgo.DestoryMongoConn(sess)
+
+	sess := jp_mgo.GetMgoConn()
+	defer jp_mgo.DestoryMongoConn(sess)
 
 	//细节才需要遍历
-	it := sess.DB(datamgo.DbName).C(jp_collname).Find(&q).Iter()
-	total,start :=0, int(time.Now().Unix())
+	it := sess.DB(jp_mgo.DbName).C(jp_c_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total,_ :=0, int(time.Now().Unix())
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%10000==0 {
-			log.Println("current index",total)
+			//log.Println("current index",total)
 		}
-		//删选条件
-
 		tmp = make(map[string]interface{})
 	}
-	log.Println("jp is over:",total,"总用时:",int(time.Now().Unix())-start,"秒")
-
 
 	//是否告警条件
 	if total<1 {
-		sendErrMailApi("竞品数据异常","数量无")
+		//sendErrMailApi("竞品数据异常","数量无")
 	}
+
 	comeintime:=qu.Int64All(time.Now().Unix())
-	date := qu.FormatDateByInt64(&comeintime, qu.Date_yyyyMMdd)
-	datamgo.Save("monitor_other", map[string]interface{}{
+	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
+	data_mgo.Save("monitor_other", map[string]interface{}{
 		"name":"竞品",
 		"num":qu.IntAll(total),
 		"comeintime":comeintime,
 		"updatedate":date,
-		"data":map[string]interface{}{},
 	})
 
+	log.Println("竞品-定时处理完毕...",int(time.Now().Unix())-start,"秒")
 }

+ 13 - 10
data_monitoring/listen_task/src/dataTaskQY.go

@@ -12,11 +12,12 @@ func dealWithQYData()  {
 	log.Println("开始统计企业数据...")
 	defer qu.Catch()
 
+	start := int(time.Now().Unix())
 	now:=time.Now() //当前天
 	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
 	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
 
-	//临时测试
+	//临时测试-8号
 	gte_time = 1617811200
 	lt_time = 1617897600
 
@@ -26,37 +27,39 @@ func dealWithQYData()  {
 			"$lt": lt_time,
 		},
 	}
-	log.Println("查询条件",q)
 
-	sess := datamgo.GetMgoConn()
-	defer datamgo.DestoryMongoConn(sess)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
 
 	//细节才需要遍历
-	it := sess.DB(datamgo.DbName).C(qy_collname).Find(&q).Iter()
-	total,start :=0, int(time.Now().Unix())
+	it := sess.DB(data_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total,_ :=0, int(time.Now().Unix())
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%10000==0 {
-			log.Println("current index",total)
+			//log.Println("current index",total)
 		}
 		//删选条件
 
 		tmp = make(map[string]interface{})
 	}
-	log.Println("qy is over:",total,"总用时:",int(time.Now().Unix())-start,"秒")
 
 
 	//是否告警条件
 	if total<1 {
 		sendErrMailApi("企业数据异常","数量无")
 	}
+
+
 	comeintime:=qu.Int64All(time.Now().Unix())
 	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
-	datamgo.Save("monitor_other", map[string]interface{}{
+	data_mgo.Save("monitor_other", map[string]interface{}{
 		"name":"企业变更",
 		"num":qu.IntAll(total),
 		"comeintime":comeintime,
 		"updatedate":date,
-		"data":map[string]interface{}{},
 	})
 
+	log.Println("企业-定时处理完毕...",int(time.Now().Unix())-start,"秒")
 }

+ 16 - 18
data_monitoring/listen_task/src/dataTaskST.go

@@ -16,10 +16,12 @@ func dealWithSTData()  {
 	log.Println("开始统计站点数据...")
 	defer qu.Catch()
 
+	start := int(time.Now().Unix())
 	now:=time.Now() //当前天
 	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
 	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
-	//临时测试
+
+	//临时测试-8号
 	gte_time = 1618070400
 	lt_time = 1618156800
 	mgodata := toCalculateMgoData(gte_time,lt_time)
@@ -33,7 +35,7 @@ func dealWithSTData()  {
 
 	comeintime:=qu.Int64All(time.Now().Unix())
 	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
-	datamgo.Save("monitor_site", map[string]interface{}{
+	data_mgo.Save("monitor_site", map[string]interface{}{
 		"comeintime":comeintime,
 		"updatedate":date,
 		"mgonum":mgonum,
@@ -41,6 +43,8 @@ func dealWithSTData()  {
 		"mgodata":mgodata["detail"],
 		"esdata":esdata["detail"],
 	})
+
+	log.Println("站点-定时处理完毕...","用时:",int(time.Now().Unix())-start,"秒")
 }
 
 
@@ -52,7 +56,9 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 			"detail": map[string]interface{}{},
 		}
 	}
-	log.Println("es 查询区间:",gte_time,lt_time)
+
+	dict:= make(map[string]interface{},0)
+	total,isOK :=0,0
 
 	//elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
 	elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
@@ -62,7 +68,7 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 		log.Println("连接池异常")
 	}
 	query :=es_elastic.NewRangeQuery("comeintime").Gte(gte_time).Lt(lt_time)
-	cursor, err := esclient.Scan(st_es_index).Query(es_elastic.NewBoolQuery().Must(query)).
+	cursor, err := esclient.Scan(es_index_st).Query(es_elastic.NewBoolQuery().Must(query)).
 		Size(200).Do()
 	if err != nil {
 		log.Println("cursor",err)
@@ -73,10 +79,7 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 	if cursor.Results.Hits == nil {
 		log.Println("expected results.Hits != nil; got nil")
 	}
-	log.Println("es total :",cursor.TotalHits(),"处理分析中......")
-
-	dict:= make(map[string]interface{},0)
-	total,start,isOK :=0, int(time.Now().Unix()),0
+	//log.Println("es total :",cursor.TotalHits(),"处理分析中......")
 	for {
 		searchResult, err := cursor.Next()
 		if err != nil {
@@ -105,7 +108,7 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 		}
 	}
 
-	log.Println("st is es over:",total,isOK,"有效:",len(dict),"用时:",int(time.Now().Unix())-start,"秒")
+	//log.Println("st is es over:",total,isOK,"有效:",len(dict),"用时:",int(time.Now().Unix())-start,"秒")
 
 
 	return map[string]interface{}{
@@ -129,17 +132,16 @@ func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
 			"$lt": lt_time,
 		},
 	}
-	log.Println("mgo 查询区间",query)
 
-	sess := sitemgo.GetMgoConn()
-	defer sitemgo.DestoryMongoConn(sess)
+	sess := st_mgo.GetMgoConn()
+	defer st_mgo.DestoryMongoConn(sess)
 
 	dict:= make(map[string]interface{},0)
 	//细节才需要遍历
-	it := sess.DB(sitemgo.DbName).C(st_collname).Find(&query).Select(map[string]interface{}{
+	it := sess.DB(st_mgo.DbName).C(st_c_name).Find(&query).Select(map[string]interface{}{
 		"site":1,
 	}).Iter()
-	total,start,isOK :=0, int(time.Now().Unix()),0
+	total,isOK :=0,0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%10000==0 {
 			//log.Println("current index",total,isOK)
@@ -156,10 +158,6 @@ func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
 
 		tmp = make(map[string]interface{})
 	}
-
-	log.Println("st is mgo over:",total,isOK,"有效:",len(dict),"用时:",int(time.Now().Unix())-start,"秒")
-
-
 	return map[string]interface{}{
 		"totalnum" : total,
 		"detail": dict,

+ 52 - 34
data_monitoring/listen_task/src/main.go

@@ -10,44 +10,55 @@ import (
 
 
 var (
-	sysconfig    	map[string]interface{} //配置文件
-	datamgo         *MongodbSim            //mongodb操作对象
-	sitemgo         *MongodbSim            //mongodb操作对象
-	jp_collname		string
-	qy_collname		string
-	st_collname		string
-	st_es_index		string
-	st_es_type		string
+	sysconfig    							map[string]interface{} //配置文件
+	data_mgo,jp_mgo,st_mgo        			*MongodbSim            //mongodb操作对象
+	qy_c_name,jp_c_name,st_c_name			string
+	es_index_st,es_type_st					string
+	save_other_name,save_site_name			string
 )
 
 func initMgo()  {
-	mconf := sysconfig["mongodb"].(map[string]interface{})
-	datamgo = &MongodbSim{
-		MongodbAddr: mconf["addrName"].(string),
-		DbName:      mconf["dbName"].(string),
-		Size:        qu.IntAllDef(mconf["pool"], 10),
-	}
-	datamgo.InitPool()
 
-	sconf := sysconfig["mongodb"].(map[string]interface{})
-	sitemgo = &MongodbSim{
-		MongodbAddr: sconf["addrName"].(string),
-		DbName:      sconf["dbName"].(string),
-		Size:        qu.IntAllDef(sconf["sconf"], 10),
+	dconf := sysconfig["data_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(dconf["coll"])
+	data_mgo = &MongodbSim{
+		MongodbAddr: dconf["addr"].(string),
+		DbName:      dconf["db"].(string),
+		Size:        qu.IntAllDef(dconf["pool"], 10),
+	}
+	data_mgo.InitPool()
+
+	jconf := sysconfig["jp_mgodb"].(map[string]interface{})
+	jp_c_name = qu.ObjToString(jconf["coll"])
+	jp_mgo = &MongodbSim{
+		MongodbAddr: jconf["addr"].(string),
+		DbName:      jconf["db"].(string),
+		Size:        qu.IntAllDef(jconf["pool"], 10),
+	}
+	jp_mgo.InitPool()
+
+	sconf := sysconfig["st_mgodb"].(map[string]interface{})
+	st_c_name = qu.ObjToString(sconf["coll"])
+	st_mgo = &MongodbSim{
+		MongodbAddr: sconf["addr"].(string),
+		DbName:      sconf["db"].(string),
+		Size:        qu.IntAllDef(sconf["pool"], 10),
 	}
-	sitemgo.InitPool()
+	st_mgo.InitPool()
 
 
-	//属性赋值
-	jp_collname = qu.ObjToString(sysconfig["jp_collname"])
-	qy_collname = qu.ObjToString(sysconfig["qy_collname"])
 
 
-	st_collname = qu.ObjToString(sysconfig["st_collname"])
-	st_es_index = qu.ObjToString(sysconfig["st_es_index"])
-	st_es_type = qu.ObjToString(sysconfig["st_es_type"])
+	//属性赋值
+	es_index_st = qu.ObjToString(sysconfig["es_index_st"])
+	es_type_st = qu.ObjToString(sysconfig["es_type_st"])
+	save_other_name = qu.ObjToString(sysconfig["save_other_name"])
+	save_site_name = qu.ObjToString(sysconfig["save_site_name"])
 
 
+	//log.Println(dconf,jconf,sconf)
+	//log.Println(qy_c_name,jp_c_name,st_c_name,es_index_st,es_type_st,save_other_name,save_site_name)
+
 }
 
 
@@ -57,30 +68,37 @@ func init() {
 	initMgo()
 }
 
-func main() {
+func mainT() {
+
 	go taskTime()
 	time.Sleep(99999 * time.Hour)
+
 }
 
-func taskTime()  {
+//测试使用
+func main() {
+
+	dealWithJPData()
 	dealWithQYData()
-	return
+	dealWithSTData()
+
+}
+
+func taskTime()  {
 
 	log.Println("部署定时任务")
 	c := cron.New()
 	//竞品-mongo
-	c.AddFunc("0 30 8 ? * *", func() { dealWithJPData() })
+	c.AddFunc("0 30 9 ? * *", func() { dealWithJPData() })
 
 	//企业变更-mongo
-	c.AddFunc("0 30 8 ? * *", func() { dealWithQYData() })
+	c.AddFunc("0 0 9 ? * *", func() { dealWithQYData() })
 
 	//站点相关-es-mongo
 	c.AddFunc("0 30 8 ? * *", func() { dealWithSTData() })
 
 	c.Start()
 
-
-
 }