Browse Source

Merge branch 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4.1

* 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract:
  监听-发送邮件-多人修改
  监听-修改-主要
  小金额不打标记
  备份-04-20
  111
  匹配金额标签
  备份-通用-分词es规则等
  监听项目拆分-公网-局域网
  备份
  邮件格式修改-vps报警-日志
  备份-vps-client部署测试
  企业-站点数据 统计xlsx发送
Jianghan 4 năm trước cách đây
mục cha
commit
95ea505db7
31 tập tin đã thay đổi với 1913 bổ sung296 xóa
  1. 20 0
      data_monitoring/listen_lan/src/config.json
  2. 43 0
      data_monitoring/listen_lan/src/errmail.go
  3. 142 0
      data_monitoring/listen_lan/src/main.go
  4. 329 0
      data_monitoring/listen_lan/src/mgo.go
  5. 48 0
      data_monitoring/listen_lan/src/sitelisten.go
  6. 39 0
      data_monitoring/listen_online/src/config.json
  7. 63 0
      data_monitoring/listen_online/src/errmail.go
  8. 145 0
      data_monitoring/listen_online/src/main.go
  9. 329 0
      data_monitoring/listen_online/src/mgo.go
  10. 81 0
      data_monitoring/listen_online/src/priselisten.go
  11. 198 0
      data_monitoring/listen_online/src/sitelisten.go
  12. 17 5
      data_monitoring/listen_task/src/config.json
  13. 15 71
      data_monitoring/listen_task/src/dataTaskJP.go
  14. 8 52
      data_monitoring/listen_task/src/dataTaskQY.go
  15. 10 15
      data_monitoring/listen_task/src/dataTaskST.go
  16. 145 30
      data_monitoring/listen_task/src/main.go
  17. 40 0
      data_monitoring/listen_task/src/mark
  18. 31 18
      data_monitoring/listen_task/src/sendmail.go
  19. 3 2
      data_monitoring/vps_client/src/config.json
  20. 9 23
      data_monitoring/vps_client/src/main.go
  21. 8 2
      data_monitoring/vps_server/src/config.json
  22. 54 5
      data_monitoring/vps_server/src/main.go
  23. 1 1
      src/config.json
  24. 1 1
      src/jy/clear/tonumber.go
  25. 96 33
      src/jy/extract/extract.go
  26. 12 12
      src/jy/extract/extractInit.go
  27. 2 2
      src/jy/pretreated/analytable.go
  28. 14 14
      standardata/src/config.json
  29. 3 3
      udpfilterdup/src/config.json
  30. 5 5
      udpfilterdup/src/main.go
  31. 2 2
      udps/main.go

+ 20 - 0
data_monitoring/listen_lan/src/config.json

@@ -0,0 +1,20 @@
+{
+  "data_mgodb": {
+    "addr": "baibai.ink:28082",
+    "db": "spider_data",
+    "coll": "data_bak",
+    "pool": 5
+  },
+  "save_other_name": "monitor_other",
+  "dynamic_coll_name": "z_site_spider",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhengkun@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "统计.xlsx"
+}

+ 43 - 0
data_monitoring/listen_lan/src/errmail.go

@@ -0,0 +1,43 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	qu "qfw/util"
+	"qfw/util/mail"
+)
+
+var from,to,cc, smtpHost,user,pwd string
+var smtpPort int
+
+
+func sendErrMailSmtp(title,body string) {
+
+
+	smtpMail, _ := sysconfig["smtpMail"].(map[string]interface{})
+
+	if smtpMail != nil {
+		from, _ = smtpMail["from"].(string)
+		to, _ = smtpMail["to"].(string)
+		cc, _ = smtpMail["cc"].(string)
+		smtpHost, _ = smtpMail["smtpHost"].(string)
+		smtpPort= qu.IntAll(smtpMail["smtpPort"])
+		user, _ = smtpMail["user"].(string)
+		pwd, _ = smtpMail["pwd"].(string)
+	}
+	f, _ := os.Open(xlsx_name)
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		fmt.Println("err:",err)
+		return
+	}
+
+	ok := mail.GSendMail_B(user, from, cc, from, title, body, f.Name(), b, &mail.GmailAuth{
+		SmtpHost: smtpHost,
+		SmtpPort: smtpPort,
+		User:     user,
+		Pwd:      pwd,
+	})
+	fmt.Println(ok)
+}

+ 142 - 0
data_monitoring/listen_lan/src/main.go

@@ -0,0 +1,142 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"github.com/tealeg/xlsx"
+	"log"
+	"os"
+	qu "qfw/util"
+	"time"
+)
+
+
+var (
+	sysconfig    							map[string]interface{} //配置文件
+	data_mgo						      	*MongodbSim     //mongodb操作对象
+	save_other_name,data_coll_name			string
+	xlsx_name,dynamic_coll_name				string
+	infoSiteArr								[]string
+)
+
+func initMgo()  {
+
+	dconf := sysconfig["data_mgodb"].(map[string]interface{})
+	data_coll_name = qu.ObjToString(dconf["coll"])
+	data_mgo = &MongodbSim{
+		MongodbAddr: dconf["addr"].(string),
+		DbName:      dconf["db"].(string),
+		Size:        qu.IntAllDef(dconf["pool"], 5),
+	}
+	data_mgo.InitPool()
+
+
+
+	//属性赋值
+	save_other_name = qu.ObjToString(sysconfig["save_other_name"])
+	dynamic_coll_name = qu.ObjToString(sysconfig["dynamic_coll_name"])
+	xlsx_name = qu.ObjToString(sysconfig["xlsx_name"])
+}
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+}
+
+func main() {
+	go taskTime()
+	time.Sleep(99999 * time.Hour)
+
+}
+
+func taskTime()  {
+
+	log.Println("部署定时任务")
+	c := cron.New()
+	//企业变更-站点-mongo
+	c.AddFunc("0 0 9 ? * *", func() { dealWithOtherData() })
+
+	c.Start()
+
+}
+
+func dealWithOtherData()  {
+
+	log.Println("开始统计相关数据...")
+
+	//先获取-待统计-动态站点
+	tmp ,tmp_err := data_mgo.Find(dynamic_coll_name, map[string]interface{}{},nil,nil)
+	if tmp_err==nil && tmp!=nil && len(tmp)>0 {
+		infoSiteArr = make([]string,0)
+		for _,v :=range tmp {
+			infoSiteArr = append(infoSiteArr,qu.ObjToString(v["site"]))
+		}
+		log.Println("目标站点:",len(infoSiteArr))
+	}
+	start := int(time.Now().Unix())
+	spider_data :=dealWithSiteSpiderData()//站点数据
+	comeintime :=qu.Int64All(time.Now().Unix())
+
+	data_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"num":qu.IntAll(spider_data["num"]),
+		"comeintime":comeintime,
+		"date":qu.ObjToString(spider_data["date"]),
+		"data":spider_data["data"],
+	})
+
+	//获取前两天数据
+	now:=time.Now() //当前天
+	cur_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	before_time := cur_time-86400*2
+	before_date :=qu.FormatDateByInt64(&before_time, qu.DATEFORMAT)
+
+
+	//站点数据
+	before_st_data :=data_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"date":before_date,
+	})
+
+
+	os.Remove("统计.xlsx")	//写excle
+	f :=xlsx.NewFile()
+	sheet, _ := f.AddSheet("统计")
+
+	//企业数据
+	row := sheet.AddRow()
+	row.AddCell().Value = "站点分类/日期"
+	row.AddCell().SetString(qu.ObjToString(spider_data["date"]))
+	if len(before_st_data)>0 {
+		row.AddCell().SetString(qu.ObjToString(before_st_data["date"]))
+	}
+
+	data_1 := *qu.ObjToMap(spider_data["data"])
+	data_2 := *qu.ObjToMap(before_st_data["data"])
+
+	for _,key := range infoSiteArr {
+		row = sheet.AddRow()
+		row.AddCell().SetString(key)
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_1[key])))
+		if len(before_st_data)>0 {
+			row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_2[key])))
+		}
+	}
+	row = sheet.AddRow()
+	row.AddCell().Value = "总计"
+	row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(spider_data["num"])))
+	if len(before_st_data)>0 {
+		row.AddCell().SetString(fmt.Sprintf("%d", qu.Int64All(before_st_data["num"])))
+	}
+	err := f.Save(xlsx_name)
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+	sendErrMailSmtp("统计-站点","附件")
+
+	log.Println("定时处理完毕...",int(time.Now().Unix())-start,"秒")
+}

+ 329 - 0
data_monitoring/listen_lan/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 48 - 0
data_monitoring/listen_lan/src/sitelisten.go

@@ -0,0 +1,48 @@
+package main
+
+import (
+	qu "qfw/util"
+	"time"
+)
+
+
+func dealWithSiteSpiderData()(map[string]interface{}) {
+
+	defer qu.Catch()
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	//细节才需要遍历
+	it := sess.DB(data_mgo.DbName).C(data_coll_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total,dict:=0,make(map[string]interface{},0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+
+	return map[string]interface{}{
+		"num":total,
+		"data":dict,
+		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
+	}
+}
+

+ 39 - 0
data_monitoring/listen_online/src/config.json

@@ -0,0 +1,39 @@
+{
+  "save_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "monitor_other",
+    "pool": 5
+  },
+  "qy_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "baidu_enterprise",
+    "pool": 5
+  },
+  "st_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "result_20210109",
+    "pool": 5
+  },
+  "es_index_st": "bidding",
+  "es_type_st": "bidding",
+  "save_other_name": "monitor_other",
+  "save_site_name" : "monitor_site",
+  "unique_site_name": "z_site_unique",
+  "jkmail": {
+    "to": "zhengkun@topnet.net.cn",
+    "api": "http://172.17.145.179:19281/_send/_mail"
+  },
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhengkun@topnet.net.cn,zhaoyujian@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "site_qy.xlsx"
+}

+ 63 - 0
data_monitoring/listen_online/src/errmail.go

@@ -0,0 +1,63 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"qfw/util/mail"
+	"os"
+	qu "qfw/util"
+)
+var tomail string
+var api string
+var from,to,cc, smtpHost,user,pwd string
+var smtpPort int
+//api模式 二选一皆可
+func sendErrMailApi(title,body string)  {
+	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail,api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	}else {
+		log.Println("邮件发送失败:", err)
+	}
+}
+
+func sendErrMailSmtp(title,body string) {
+
+
+	smtpMail, _ := sysconfig["smtpMail"].(map[string]interface{})
+
+	if smtpMail != nil {
+		from, _ = smtpMail["from"].(string)
+		to, _ = smtpMail["to"].(string)
+		cc, _ = smtpMail["cc"].(string)
+		smtpHost, _ = smtpMail["smtpHost"].(string)
+		smtpPort= qu.IntAll(smtpMail["smtpPort"])
+		user, _ = smtpMail["user"].(string)
+		pwd, _ = smtpMail["pwd"].(string)
+	}
+	f, _ := os.Open(xlsx_name)
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		fmt.Println("err:",err)
+		return
+	}
+
+
+	ok := mail.GSendMail_Bq(from, to, cc, cc, title, body, f.Name(), b, &mail.GmailAuth{
+		SmtpHost: smtpHost,
+		SmtpPort: smtpPort,
+		User:     user,
+		Pwd:      pwd,
+	})
+	fmt.Println(ok)
+}

+ 145 - 0
data_monitoring/listen_online/src/main.go

@@ -0,0 +1,145 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"github.com/tealeg/xlsx"
+	"log"
+	"os"
+	qu "qfw/util"
+	"time"
+)
+
+
+var (
+	sysconfig    							map[string]interface{} //配置文件
+	save_mgo,qy_mgo,st_mgo        			*MongodbSim            //mongodb操作对象
+	qy_c_name,st_c_name						string
+	es_index_st,es_type_st					string
+	save_other_name,save_site_name			string
+	xlsx_name,unique_site_name				string
+)
+
+func initMgo()  {
+
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+	qconf := sysconfig["qy_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(qconf["coll"])
+	qy_mgo = &MongodbSim{
+		MongodbAddr: qconf["addr"].(string),
+		DbName:      qconf["db"].(string),
+		Size:        qu.IntAllDef(qconf["pool"], 5),
+	}
+	qy_mgo.InitPool()
+
+	sconf := sysconfig["st_mgodb"].(map[string]interface{})
+	st_c_name = qu.ObjToString(sconf["coll"])
+	st_mgo = &MongodbSim{
+		MongodbAddr: sconf["addr"].(string),
+		DbName:      sconf["db"].(string),
+		Size:        qu.IntAllDef(sconf["pool"], 5),
+	}
+	st_mgo.InitPool()
+
+
+	//属性赋值
+	es_index_st = qu.ObjToString(sysconfig["es_index_st"])
+	es_type_st = qu.ObjToString(sysconfig["es_type_st"])
+	save_other_name = qu.ObjToString(sysconfig["save_other_name"])
+	save_site_name = qu.ObjToString(sysconfig["save_site_name"])
+	unique_site_name = qu.ObjToString(sysconfig["unique_site_name"])
+	xlsx_name = qu.ObjToString(sysconfig["xlsx_name"])
+
+}
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+}
+
+func main() {
+	log.Println("等待执行")
+	go taskTime()
+	time.Sleep(99999 * time.Hour)
+}
+
+func taskTime()  {
+
+	log.Println("部署定时任务")
+	c := cron.New()
+	//企业变更-mongo-site-es
+	c.AddFunc("0 50 8 ? * *", func() { dealWithUniqueData() })
+	c.Start()
+}
+
+func dealWithUniqueData()  {
+
+	log.Println("开始统计相关数据...")
+
+	qy_data := dealWithQYData()
+	st_data :=dealWithSTData()
+
+	log.Println("处理数据...发送邮件...")
+
+	os.Remove(xlsx_name)
+	f :=xlsx.NewFile()
+	sheet, _ := f.AddSheet("统计")
+	//企业数据
+	row := sheet.AddRow()
+	row.AddCell().Value = "企业变更/日期"
+	row.AddCell().Value = "数量"
+	for k,v :=range qy_data {
+		row = sheet.AddRow()
+		row.AddCell().SetString(k)
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.IntAll(v)))
+	}
+	row = sheet.AddRow()
+	row = sheet.AddRow()
+
+
+	if len(st_data)>0 { //有数据
+		row = sheet.AddRow()
+		row.AddCell().Value = "主要站点/日期"
+		row.AddCell().Value = "名称"
+		row.AddCell().Value = "数量"
+		row.AddCell().Value = "总计"
+
+
+		mgodata := *qu.ObjToMap(st_data["mgodata"])
+		date := qu.ObjToString(st_data["date"])
+		total := qu.IntAll(st_data["mgonum"])
+		isT := false
+		for key,_ := range uniqueData {
+			row = sheet.AddRow()
+			if !isT {
+				row.AddCell().SetString(date)
+				row.AddCell().SetString(key)
+				row.AddCell().SetString(fmt.Sprintf("%d",qu.IntAll(mgodata[key])))
+				row.AddCell().SetString(fmt.Sprintf("%d",qu.IntAll(total)))
+				isT = true
+			}else {
+				row.AddCell().Value = ""
+				row.AddCell().SetString(key)
+				row.AddCell().SetString(fmt.Sprintf("%d",qu.IntAll(mgodata[key])))
+				row.AddCell().Value = ""
+			}
+		}
+	}
+	err := f.Save(xlsx_name)
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+	sendErrMailSmtp("主要站点以及企业相关统计","附件")
+}

+ 329 - 0
data_monitoring/listen_online/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 81 - 0
data_monitoring/listen_online/src/priselisten.go

@@ -0,0 +1,81 @@
+package main
+
+import (
+	"log"
+	qu "qfw/util"
+	"time"
+)
+
+
+func dealWithQYData()map[string]interface{} {
+
+
+	defer qu.Catch()
+	start := int(time.Now().Unix())
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+	q := map[string]interface{}{
+		"down_time": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+	sess := qy_mgo.GetMgoConn()
+	defer qy_mgo.DestoryMongoConn(sess)
+
+	data := make(map[string]interface{},0)
+	//细节才需要遍历
+	it := sess.DB(qy_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		tmp = make(map[string]interface{})
+	}
+
+	qy_date := qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT)
+	data[qy_date] = total
+	comeintime :=qu.Int64All(time.Now().Unix())
+	save_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"num":qu.IntAll(total),
+		"comeintime":comeintime,
+		"date":qy_date,
+	})
+
+	//获取前两天数据
+	before_time := lt_time-86400*2
+	before_date :=qu.FormatDateByInt64(&before_time, qu.DATEFORMAT)
+	//企业数据
+	before_qy_data :=save_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"date":before_date,
+	})
+	if len(before_qy_data)>1 {
+		data[before_date] =qu.IntAll(before_qy_data["num"])
+	}
+	log.Println("企业变更处理完毕...",int(time.Now().Unix())-start,"秒")
+
+	return data
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+ 198 - 0
data_monitoring/listen_online/src/sitelisten.go

@@ -0,0 +1,198 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"time"
+	es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
+)
+
+var uniqueData  map[string]interface{}   //指定站点
+
+func initUniqueSite()  {
+
+	uniqueData = make(map[string]interface{})
+
+	query := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	it := sess.DB(save_mgo.DbName).C(unique_site_name).Find(&query).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		site:=qu.ObjToString(tmp["site"])
+		uniqueData[site] = 1
+		tmp = make(map[string]interface{})
+	}
+
+	log.Println("is unique_site over",len(uniqueData))
+
+}
+
+func dealWithSTData() map[string]interface{} {
+
+	log.Println("开始统计全量站点数据...")
+	defer qu.Catch()
+
+
+
+	initUniqueSite() //准备独特站点
+
+	if len(uniqueData)<1 {
+		log.Println("独特站点-准备异常")
+		sendErrMailApi("独特站点-线上-准备异常","")
+		return map[string]interface{}{}
+	}
+
+	start := int(time.Now().Unix())
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
+	mgodata := toCalculateMgoData(gte_time,lt_time)
+	esdata := toCalculateEsData(fmt.Sprintf("%d",gte_time),fmt.Sprintf("%d",lt_time))
+	mgonum,esnum:= qu.IntAll(mgodata["totalnum"]),qu.IntAll(esdata["totalnum"])
+
+	comeintime:=qu.Int64All(time.Now().Unix())
+	date := qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT)
+	save_data :=  map[string]interface{}{
+		"comeintime":comeintime,
+		"date":date,
+		"name":"全量主要站点",
+		"mgonum":mgonum,
+		"esnum":esnum,
+		"mgodata":mgodata["detail"],
+		"esdata":esdata["detail"],
+	}
+	save_mgo.Save(save_site_name, save_data)
+
+	log.Println("站点-定时处理完毕...","用时:",int(time.Now().Unix())-start,"秒")
+
+	return save_data
+
+}
+
+
+
+
+
+//es数据统计
+func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
+
+	if gte_time == "" || lt_time == "" {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	dict:= make(map[string]interface{},0)
+	total :=0
+
+	elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
+	//elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
+	esclient := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(esclient)
+	if esclient == nil {
+		log.Println("连接池异常")
+	}
+	query :=es_elastic.NewRangeQuery("comeintime").Gte(gte_time).Lt(lt_time)
+	cursor, err := esclient.Scan(es_index_st).Query(es_elastic.NewBoolQuery().Must(query)).
+		Size(200).Do()
+	if err != nil {
+		log.Println("cursor",err)
+	}
+	if cursor.Results == nil {
+		log.Println("results != nil; got nil")
+	}
+	if cursor.Results.Hits == nil {
+		log.Println("expected results.Hits != nil; got nil")
+	}
+	log.Println("es total :",cursor.TotalHits(),"处理分析中......")
+	for {
+		searchResult, err := cursor.Next()
+		if err != nil {
+			if err.Error() == "EOS" {
+				break
+			}else {
+				log.Println("cursor searchResult",err)
+			}
+		}
+		for _, hit := range searchResult.Hits.Hits {
+			tmp := make(map[string]interface{})
+			err := json.Unmarshal(*hit.Source, &tmp)
+			if err != nil {
+				log.Println("json Unmarshal error")
+				continue
+			}
+
+			site := qu.ObjToString(tmp["site"])	//统计站点
+			if uniqueData[site]!=nil { //统计有效站点
+				total++
+				if dict[site] == nil {
+					dict[site] = 1
+				}else {
+					num := qu.IntAll(dict[site])+1
+					dict[site] = num
+				}
+			}
+		}
+	}
+
+
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}
+
+
+
+
+//mongo数据统计
+func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
+
+	if gte_time == 0 || lt_time == 0 {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+
+	sess := st_mgo.GetMgoConn()
+	defer st_mgo.DestoryMongoConn(sess)
+
+	dict:= make(map[string]interface{},0)
+	//细节才需要遍历
+	it := sess.DB(st_mgo.DbName).C(st_c_name).Find(&query).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp);  {
+		//统计站点
+		site := qu.ObjToString(tmp["site"])
+		if uniqueData[site]!=nil { //统计有效站点
+			total++
+			if dict[site] == nil {
+				dict[site] = 1
+			}else {
+				num := qu.IntAll(dict[site])+1
+				dict[site] = num
+			}
+		}
+		tmp = make(map[string]interface{})
+	}
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}

+ 17 - 5
data_monitoring/listen_task/src/config.json

@@ -1,8 +1,14 @@
 {
-  "data_mgodb": {
+  "save_mgodb": {
     "addr": "192.168.3.207:27092",
     "db": "zhengkun",
-    "coll": "baidu_enterprise",
+    "coll": "monitor_other",
+    "pool": 5
+  },
+  "qy_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "baidu_enterprise_test",
     "pool": 5
   },
   "jp_mgodb": {
@@ -21,8 +27,14 @@
   "es_type_st": "bidding",
   "save_other_name": "monitor_other",
   "save_site_name" : "monitor_site",
-  "jkmail": {
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
     "to": "zhengkun@topnet.net.cn",
-    "api": "http://172.17.145.179:19281/_send/_mail"
-  }
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "统计.xlsx"
 }

+ 15 - 71
data_monitoring/listen_task/src/dataTaskJP.go

@@ -1,21 +1,17 @@
 package main
 
 import (
-	"fmt"
-	"log"
 	qu "qfw/util"
 	"time"
 )
 
 
-func dealWithJPData()  {
+func dealWithJPData()(map[string]interface{}) {
 
-	log.Println("开始统计竞品数据...")
 	defer qu.Catch()
-	start := int(time.Now().Unix())
 	now:=time.Now() //当前天
-	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
 
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
@@ -29,76 +25,24 @@ func dealWithJPData()  {
 	it := sess.DB(jp_mgo.DbName).C(jp_c_name).Find(&q).Select(map[string]interface{}{
 		"site":1,
 	}).Iter()
-	total,isOK,_ :=0, 0,int(time.Now().Unix())
+	total,dict:=0,make(map[string]interface{},0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
-		if qu.ObjToString(tmp["site"]) == "中国招标与采购网" {
-			isOK++
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
 		}
 		tmp = make(map[string]interface{})
 	}
 
-	//是否告警条件
-	if total<1 {
-		//sendErrMailApi("竞品数据异常","数量无")
-	}
 
-	data_mgo.Save("monitor_other", map[string]interface{}{
-		"name":"竞品",
-		"num":qu.IntAll(total),
-		"comeintime":qu.Int64All(time.Now().Unix()),
-		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
-	})
 
-	log.Println("竞品-定时处理完毕...",int(time.Now().Unix())-start,"秒")
+	return map[string]interface{}{
+		"num":total,
+		"data":dict,
+		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
+	}
 }
 
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-//测试分析
-func dealWithJPTest(year int,month time.Month, day int)  {
-	gte_time:= time.Date(year, month, day, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(year, month, day+1, 0, 0, 0, 0, time.Local).Unix()
-	q := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$gte":  gte_time,
-			"$lt": lt_time,
-		},
-	}
-	sess := jp_mgo.GetMgoConn()
-	defer jp_mgo.DestoryMongoConn(sess)
-	it := sess.DB(jp_mgo.DbName).C(jp_c_name).Find(&q).Select(map[string]interface{}{
-		"site":1,
-	}).Iter()
-	total,isOK :=0, 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
-		if qu.ObjToString(tmp["site"]) == "中国招标与采购网" {
-			isOK++
-		}
-		tmp = make(map[string]interface{})
-	}
-	fmt.Println(qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),"总量:",total,"竞品:",isOK)
-	comeintime:=qu.Int64All(time.Now().Unix())
-	date := qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT)
-	data_mgo.Save("monitor_other", map[string]interface{}{
-		"name":"竞品",
-		"num":qu.IntAll(isOK),
-		"comeintime":comeintime,
-		"date":date,
-	})
-}

+ 8 - 52
data_monitoring/listen_task/src/dataTaskQY.go

@@ -2,22 +2,19 @@ package main
 
 import (
 	"fmt"
-	"log"
 	qu "qfw/util"
 	"time"
 )
 
 
-func dealWithQYData()  {
+func dealWithQYData()(string,string)  {
 
-	log.Println("开始统计企业数据...")
 
 	defer qu.Catch()
 
-	start := int(time.Now().Unix())
 	now:=time.Now() //当前天
-	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
 
 	q := map[string]interface{}{
 		"down_time": map[string]interface{}{
@@ -26,31 +23,18 @@ func dealWithQYData()  {
 		},
 	}
 
-	sess := data_mgo.GetMgoConn()
-	defer data_mgo.DestoryMongoConn(sess)
+	sess := qy_mgo.GetMgoConn()
+	defer qy_mgo.DestoryMongoConn(sess)
 
 	//细节才需要遍历
-	it := sess.DB(data_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
+	it := sess.DB(qy_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
 		"site":1,
 	}).Iter()
-	total,_ :=0, int(time.Now().Unix())
+	total:=0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		tmp = make(map[string]interface{})
 	}
-
-	//是否告警条件
-	if total<1 {
-		sendErrMailApi("企业数据异常","数量无")
-	}
-
-	data_mgo.Save("monitor_other", map[string]interface{}{
-		"name":"企业变更",
-		"num":qu.IntAll(total),
-		"comeintime":qu.Int64All(time.Now().Unix()),
-		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
-	})
-
-	log.Println("企业-定时处理完毕...",int(time.Now().Unix())-start,"秒")
+	return qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),fmt.Sprintf("%d",total)
 }
 
 
@@ -72,31 +56,3 @@ func dealWithQYData()  {
 
 
 
-
-//测试分析
-func dealWithQYTest(year int ,month time.Month, day int)  {
-	gte_time:= time.Date(year, month, day, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(year, month, day+1, 0, 0, 0, 0, time.Local).Unix()
-	q := map[string]interface{}{
-		"down_time": map[string]interface{}{
-			"$gte":  gte_time,
-			"$lt": lt_time,
-		},
-	}
-	sess := data_mgo.GetMgoConn()
-	defer data_mgo.DestoryMongoConn(sess)
-	it := sess.DB(data_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
-		"site":1,
-	}).Iter()
-	total :=0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
-		tmp = make(map[string]interface{})
-	}
-	fmt.Println(qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),"总量:",total)
-	data_mgo.Save("monitor_other", map[string]interface{}{
-		"name":"企业变更",
-		"num":qu.IntAll(total),
-		"comeintime":qu.Int64All(time.Now().Unix()),
-		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
-	})
-}

+ 10 - 15
data_monitoring/listen_task/src/dataTaskST.go

@@ -13,26 +13,24 @@ import (
 
 func dealWithSTData()  {
 
-	log.Println("开始统计站点数据...")
+	log.Println("开始统计全量站点数据...")
 	defer qu.Catch()
 
 	start := int(time.Now().Unix())
 	now:=time.Now() //当前天
-	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
-
-	gte_time = 1618070400
-	lt_time = 1618156800
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
 
 	mgodata := toCalculateMgoData(gte_time,lt_time)
 	esdata := toCalculateEsData(fmt.Sprintf("%d",gte_time),fmt.Sprintf("%d",lt_time))
 	mgonum,esnum:= qu.IntAll(mgodata["totalnum"]),qu.IntAll(esdata["totalnum"])
 
 	comeintime:=qu.Int64All(time.Now().Unix())
-	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
-	data_mgo.Save("monitor_site", map[string]interface{}{
+	date := qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT)
+	save_mgo.Save("monitor_site", map[string]interface{}{
 		"comeintime":comeintime,
 		"date":date,
+		"name":"全量站点",
 		"mgonum":mgonum,
 		"esnum":esnum,
 		"mgodata":mgodata["detail"],
@@ -64,10 +62,10 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 	}
 
 	dict:= make(map[string]interface{},0)
-	total,isOK :=0,0
+	total :=0
 
-	//elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
-	elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
+	elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
+	//elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
 	esclient := elastic.GetEsConn()
 	defer elastic.DestoryEsConn(esclient)
 	if esclient == nil {
@@ -106,7 +104,6 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 			site := qu.ObjToString(tmp["site"])	//统计站点
 			if dict[site] == nil {
 				dict[site] = 1
-				isOK++
 			}else {
 				num := qu.IntAll(dict[site])+1
 				dict[site] = num
@@ -114,7 +111,6 @@ func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
 		}
 	}
 
-	//log.Println("st is es over:",total,isOK,"有效:",len(dict),"用时:",int(time.Now().Unix())-start,"秒")
 
 	return map[string]interface{}{
 		"totalnum" : total,
@@ -150,13 +146,12 @@ func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
 	it := sess.DB(st_mgo.DbName).C(st_c_name).Find(&query).Select(map[string]interface{}{
 		"site":1,
 	}).Iter()
-	total,isOK :=0,0
+	total:=0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		//统计站点
 		site := qu.ObjToString(tmp["site"])
 		if dict[site] == nil {
 			dict[site] = 1
-			isOK++
 		}else {
 			num := qu.IntAll(dict[site])+1
 			dict[site] = num

+ 145 - 30
data_monitoring/listen_task/src/main.go

@@ -1,39 +1,52 @@
 package main
 
 import (
+	"fmt"
 	"github.com/cron"
+	"github.com/tealeg/xlsx"
 	"log"
+	"os"
 	qu "qfw/util"
 	"time"
-
 )
 
 
 var (
 	sysconfig    							map[string]interface{} //配置文件
-	data_mgo,jp_mgo,st_mgo        			*MongodbSim            //mongodb操作对象
+	save_mgo,qy_mgo,jp_mgo,st_mgo        	*MongodbSim            //mongodb操作对象
 	qy_c_name,jp_c_name,st_c_name			string
 	es_index_st,es_type_st					string
 	save_other_name,save_site_name			string
+	xlsx_name								string
+	infoSiteArr								[]string
 )
 
 func initMgo()  {
 
-	dconf := sysconfig["data_mgodb"].(map[string]interface{})
-	qy_c_name = qu.ObjToString(dconf["coll"])
-	data_mgo = &MongodbSim{
-		MongodbAddr: dconf["addr"].(string),
-		DbName:      dconf["db"].(string),
-		Size:        qu.IntAllDef(dconf["pool"], 10),
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+	qconf := sysconfig["qy_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(qconf["coll"])
+	qy_mgo = &MongodbSim{
+		MongodbAddr: qconf["addr"].(string),
+		DbName:      qconf["db"].(string),
+		Size:        qu.IntAllDef(qconf["pool"], 5),
 	}
-	data_mgo.InitPool()
+	qy_mgo.InitPool()
 
 	jconf := sysconfig["jp_mgodb"].(map[string]interface{})
 	jp_c_name = qu.ObjToString(jconf["coll"])
 	jp_mgo = &MongodbSim{
 		MongodbAddr: jconf["addr"].(string),
 		DbName:      jconf["db"].(string),
-		Size:        qu.IntAllDef(jconf["pool"], 10),
+		Size:        qu.IntAllDef(jconf["pool"], 5),
 	}
 	jp_mgo.InitPool()
 
@@ -42,7 +55,7 @@ func initMgo()  {
 	st_mgo = &MongodbSim{
 		MongodbAddr: sconf["addr"].(string),
 		DbName:      sconf["db"].(string),
-		Size:        qu.IntAllDef(sconf["pool"], 10),
+		Size:        qu.IntAllDef(sconf["pool"], 5),
 	}
 	st_mgo.InitPool()
 
@@ -54,29 +67,19 @@ func initMgo()  {
 	save_site_name = qu.ObjToString(sysconfig["save_site_name"])
 
 
-
+	xlsx_name = qu.ObjToString(sysconfig["xlsx_name"])
 }
 
-
 func init() {
 	//加载配置文件
 	qu.ReadConfig(&sysconfig)
 	initMgo()
 }
 
-func mainT() {
-
-	go taskTime()
-	time.Sleep(99999 * time.Hour)
-
-}
-
-//测试使用
 func main() {
+	go taskTime()
 
-	//dealWithJPData()
-	//dealWithQYData()
-	dealWithSTData()
+	time.Sleep(99999 * time.Hour)
 
 }
 
@@ -84,17 +87,129 @@ func taskTime()  {
 
 	log.Println("部署定时任务")
 	c := cron.New()
-	//竞品-mongo
-	c.AddFunc("0 30 9 ? * *", func() { dealWithJPData() })
 
-	//企业变更-mongo
-	c.AddFunc("0 0 9 ? * *", func() { dealWithQYData() })
-
-	//站点相关-es-mongo
+	//站点相关全量-es-mongo
 	c.AddFunc("0 30 8 ? * *", func() { dealWithSTData() })
 
+
+	//企业变更-站点-mongo
+	c.AddFunc("0 0 9 ? * *", func() { dealWithOtherData() })
+
 	c.Start()
 
 }
 
+func dealWithOtherData()  {
+
+	log.Println("开始统计相关数据...")
+
+	//先获取-待统计-动态站点
+	tmp ,tmp_err := save_mgo.Find("z_site_spider", map[string]interface{}{},nil,nil)
+	if tmp_err==nil && tmp!=nil && len(tmp)>0 {
+		infoSiteArr = make([]string,0)
+		for _,v :=range tmp {
+			infoSiteArr = append(infoSiteArr,qu.ObjToString(v["site"]))
+		}
+		log.Println("目标站点:",len(infoSiteArr))
+	}
+
+	start := int(time.Now().Unix())
+	st_data :=dealWithJPData()//站点数据
+	qy_date,qy_num := dealWithQYData()//企业数据
+	comeintime :=qu.Int64All(time.Now().Unix())
+	save_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"num":qu.IntAll(qy_num),
+		"comeintime":comeintime,
+		"date":qy_date,
+	})
+
+	save_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"num":qu.IntAll(st_data["num"]),
+		"comeintime":comeintime,
+		"date":qu.ObjToString(st_data["date"]),
+		"data":st_data["data"],
+	})
+
+
+
+	//获取前两天数据
+	qy_arr := make([]map[string]string,0)
+	now:=time.Now() //当前天
+	cur_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	before_time := cur_time-86400*2
+	before_date :=qu.FormatDateByInt64(&before_time, qu.DATEFORMAT)
+	//企业数据
+	before_qy_data :=save_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"date":before_date,
+	})
+	qy_arr = append(qy_arr, map[string]string{
+		"date":qy_date,
+		"num":qy_num,
+	})
+	if before_qy_data!=nil && len(before_qy_data)>2 {
+		qy_arr = append(qy_arr, map[string]string{
+			"date":qu.ObjToString(before_qy_data["date"]),
+			"num":fmt.Sprintf("%d",before_qy_data["num"]),
+		})
+	}
+
+
+	//站点数据
+	before_st_data :=save_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"date":before_date,
+	})
+
+
+	os.Remove("统计.xlsx")	//写excle
+	f :=xlsx.NewFile()
+	sheet, _ := f.AddSheet("统计")
+
+	//企业数据
+	row := sheet.AddRow()
+	row.AddCell().Value = "企业分类/日期"
+	row.AddCell().Value = "数量"
+	for _,tmp := range qy_arr {
+		row = sheet.AddRow()
+		row.AddCell().SetString(tmp["date"])
+		row.AddCell().SetString(tmp["num"])
+	}
+
+
+	//站点相关数据
+	sheet.AddRow()
+	row = sheet.AddRow()
+	row.AddCell().Value = "站点分类/日期"
+	row.AddCell().SetString(qu.ObjToString(st_data["date"]))
+	row.AddCell().SetString(qu.ObjToString(before_st_data["date"]))
+
+
+	data_1 := *qu.ObjToMap(st_data["data"])
+	data_2 := *qu.ObjToMap(before_st_data["data"])
+
+	for _,key := range infoSiteArr {
+		row = sheet.AddRow()
+		row.AddCell().SetString(key)
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_1[key])))
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_2[key])))
+	}
+	row = sheet.AddRow()
+	row.AddCell().Value = "总计"
+	row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(st_data["num"])))
+	row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(before_st_data["num"])))
+
+	err := f.Save(xlsx_name)
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+
+	sendErrMailSmtp("统计-站点-企业","附件")
 
+	log.Println("定时处理完毕...",int(time.Now().Unix())-start,"秒")
+}

+ 40 - 0
data_monitoring/listen_task/src/mark

@@ -0,0 +1,40 @@
+{
+  "save_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "editor",
+    "coll": "monitor_other",
+    "pool": 5
+  },
+  "qy_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "spider",
+    "coll": "baidu_enterprise",
+    "pool": 5
+  },
+  "jp_mgodb": {
+    "addr": "172.17.145.179:28082",
+    "db": "spider_data",
+    "coll": "data_bak",
+    "pool": 5
+  },
+  "st_mgodb": {
+    "addr": "172.17.4.85:27080",
+    "db": "qfw",
+    "coll": "result_20210109",
+    "pool": 5
+  },
+  "es_index_st": "bidding",
+  "es_type_st": "bidding",
+  "save_other_name": "monitor_other",
+  "save_site_name" : "monitor_site",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhengkun@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "统计.xlsx"
+}

+ 31 - 18
data_monitoring/listen_task/src/sendmail.go

@@ -3,28 +3,41 @@ package main
 import (
 	"fmt"
 	"io/ioutil"
-	"log"
-	"net/http"
+	"os"
+	qu "qfw/util"
+	"qfw/util/mail"
 )
 
+var from,to,cc, smtpHost,user,pwd string
+var smtpPort int
 
-var tomail string
-var api string
 
-//api模式
-func sendErrMailApi(title,body string)  {
-	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
-	if jkmail != nil {
-		tomail, _ = jkmail["to"].(string)
-		api, _ = jkmail["api"].(string)
+func sendErrMailSmtp(title,body string) {
+
+
+	smtpMail, _ := sysconfig["smtpMail"].(map[string]interface{})
+
+	if smtpMail != nil {
+		from, _ = smtpMail["from"].(string)
+		to, _ = smtpMail["to"].(string)
+		cc, _ = smtpMail["cc"].(string)
+		smtpHost, _ = smtpMail["smtpHost"].(string)
+		smtpPort= qu.IntAll(smtpMail["smtpPort"])
+		user, _ = smtpMail["user"].(string)
+		pwd, _ = smtpMail["pwd"].(string)
 	}
-	log.Println(tomail,api)
-	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
-	if err == nil {
-		defer res.Body.Close()
-		read, err := ioutil.ReadAll(res.Body)
-		log.Println("邮件发送成功:", string(read), err)
-	}else {
-		log.Println("邮件发送失败:", err)
+	f, _ := os.Open(xlsx_name)
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		fmt.Println("err:",err)
+		return
 	}
+
+	ok := mail.GSendMail_B(user, from, cc, from, title, body, f.Name(), b, &mail.GmailAuth{
+		SmtpHost: smtpHost,
+		SmtpPort: smtpPort,
+		User:     user,
+		Pwd:      pwd,
+	})
+	fmt.Println(ok)
 }

+ 3 - 2
data_monitoring/vps_client/src/config.json

@@ -2,7 +2,8 @@
   "vpsID": "专用-常州",
   "processArr": [
     "d1.exe",
-    "d2.exe"
+    "d2.exe",
+    "d3.exe"
   ],
-  "during":5
+  "during":"10"
 }

+ 9 - 23
data_monitoring/vps_client/src/main.go

@@ -23,10 +23,9 @@ var (
 )
 
 func init()  {
-	log.Println("加载...")
 	qu.ReadConfig(&sysconfig)
 	vpsID = qu.ObjToString(sysconfig["vpsID"])
-	during = qu.ObjToString(sysconfig["duringv"])
+	during = qu.ObjToString(sysconfig["during"])
 	processArr = qu.ObjArrToStringArr(sysconfig["processArr"].([]interface{}))
 }
 
@@ -38,20 +37,17 @@ func main() {
 		os.Exit(1)
 	}
 
-
 	//定时器
 	c := cron.New()
-	spec :=fmt.Sprintf("0 */%d * * * ?",during)	//分
-	//spec :=fmt.Sprintf("*/%d * * * * ?",during)		//秒
-
+	spec := fmt.Sprintf("0 */%s * * * ?",during)	//分
+	//spec =fmt.Sprintf("*/%s * * * * ?",during)		//秒
 	c.AddFunc(spec, func() { task() })
-	//c.AddFunc("*/10 * * * * ?", func() { task() })
 	c.Start()
 	time.Sleep(99999 * time.Hour)
 }
 
 func task()  {
-
+	log.Println("执行...检测...")
 	//先检测下载器
 	process := "0" //正常 - windows模式
 	for _,v:=range processArr {
@@ -69,7 +65,7 @@ func task()  {
 			}
 		}
 	}
-	log.Println("当前下载器:",process)
+	//log.Println("当前下载器:",process)
 	//u, _ := url.Parse("http://127.0.0.1:7811") //本地测试
 	u, _ := url.Parse("http://monitor.spdata.jianyu360.com") //线上
 	q := u.Query()
@@ -77,10 +73,13 @@ func task()  {
 	q.Set("process", process)
 	u.RawQuery = q.Encode()
 
+	//log.Println(vpsID,process)
+
 	_, err := http.Get(u.String());
 	if err != nil {
-		//log.Println("异常",err)
+		log.Println("异常",err)
 	}
+
 }
 
 
@@ -94,19 +93,6 @@ func task()  {
 
 
 
-
-
-
-
-
-
-
-
-
-
-
-
-
 //根据进程名判断进程是否运行
 func checkProRunning(serverName string) (bool, error) {
 	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`

+ 8 - 2
data_monitoring/vps_server/src/config.json

@@ -1,10 +1,16 @@
 {
   "port": "7811",
+  "save_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "monitor_other",
+    "pool": 5
+  },
   "vpsIDs" : [
     "专用-常州"
   ],
   "during": 10,
-  "isErr" : 5,
+  "isErr" : 3,
   "smtpMail": {
     "from": "zhengkun@topnet.net.cn",
     "to": "zhengkun@topnet.net.cn",
@@ -14,7 +20,7 @@
     "pwd":      "xomkphsjsamybdbj"
   },
   "jkmail": {
-    "to": "zhengkun@topnet.net.cn",
+    "to": "zhaoyujian@topnet.net.cn",
     "api": "http://172.17.145.179:19281/_send/_mail"
   }
 }

+ 54 - 5
data_monitoring/vps_server/src/main.go

@@ -7,14 +7,19 @@ import (
 	"net/http"
 	qu "qfw/util"
 	"strings"
+	"sync"
 	"time"
 )
 var (
 	sysconfig			map[string]interface{} //配置文件
 	port				string
+	save_mgo        	*MongodbSim
 	idsArr				[]string
 	dataTmp				map[string]map[string]interface{}
 	during,isErr		int64
+	test				map[string]interface{}
+	updatelock 			sync.Mutex
+	save_coll_name		string
 )
 func init()  {
 	//加载配置文件
@@ -34,11 +39,25 @@ func init()  {
 			"isProMail":0,
 		}
 	}
+
 	during = qu.Int64All(sysconfig["during"])
 	isErr = qu.Int64All(sysconfig["isErr"])
+
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	save_coll_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+
 	log.Println("准备完毕...")
 }
 
+
+
 func main() {
 
 	//http://monitor.spdata.jianyu360.com/,程序端口7811
@@ -48,8 +67,8 @@ func main() {
 
 	//每隔1分钟执行一次:0 */1 * * * ?   每隔5秒执行一次:*/5 * * * * ?
 
-	spec :=fmt.Sprintf("0 */%d * * * ?",during)
-	//spec :=fmt.Sprintf("*/%d * * * * ?",during)
+	spec :=fmt.Sprintf("30 */%d * * * ?",during)
+	//spec =fmt.Sprintf("*/%d * * * * ?",during)
 	c := cron.New()
 	c.AddFunc(spec, func() { taskFinishing()})
 	c.Start()
@@ -57,17 +76,20 @@ func main() {
 }
 
 func handler(w http.ResponseWriter, r *http.Request) {
+	updatelock.Lock()
 	r.ParseForm() //解析参数,默认是不会解析的
 	if r.Method == "GET" {
 		vpsid ,process,isProMail:= "",int64(0),int64(0)
 		for k, v := range r.Form {
 			if k=="id" {
 				vpsid = strings.Join(v, "")
+				isProMail = qu.Int64All(dataTmp[vpsid]["isProMail"])
 			}else if k=="process" {
 				process = qu.Int64All(strings.Join(v, ""))
 				if process==0 {
 					isProMail = 0
 				}
+
 			}else {
 
 			}
@@ -81,6 +103,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
 				"isProMail":isProMail,
 			}
 		}
+
 		//log.Println("接收Get请求:",dataTmp[vpsid])
 
 	} else if r.Method == "POST" {
@@ -88,11 +111,16 @@ func handler(w http.ResponseWriter, r *http.Request) {
 	} else {
 
 	}
+
+	updatelock.Unlock()
 }
 
 //不断监听处理
 func taskFinishing()  {
-	//log.Println("执行...处理一次...")
+	//加锁
+	updatelock.Lock()
+	log.Println("...处理一次...")
+
 	isVpsMailContent,isProMailContent:= "",""
 	for _ , vpsid := range idsArr {
 		//此标识-是否正常
@@ -138,16 +166,37 @@ func taskFinishing()  {
 			"isVpsMail":isVpsMail,
 			"isProMail" : isProMail,
 		}
+
 	}
 
+	//log.Println("处理后",isProMailContent)
 
 	if isVpsMailContent!=""{
 		log.Println("发邮件:vps异常...",isVpsMailContent)
-		//sendErrMailApi("vps异常",isVpsMailContent)
+		comeintime :=qu.Int64All(time.Now().Unix())//日志记录
+		save_mgo.Save(save_coll_name, map[string]interface{}{
+			"name":"vps",
+			"comeintime":comeintime,
+			"date":qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT),
+			"detail" : isVpsMailContent,
+		})
+
+		sendErrMailApi("vps",isVpsMailContent)
 	}else {
 		if isProMailContent !="" {
 			log.Println("发邮件:下载器异常...",isProMailContent)
-			//sendErrMailApi("下载器异常",isProMailContent)
+			comeintime :=qu.Int64All(time.Now().Unix())//日志记录
+			save_mgo.Save(save_coll_name, map[string]interface{}{
+				"name":"下载器",
+				"comeintime":comeintime,
+				"date":qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT),
+				"detail" : isProMailContent,
+			})
+
+			sendErrMailApi("下载器异常",isProMailContent)
 		}
 	}
+
+	updatelock.Unlock()
+
 }

+ 1 - 1
src/config.json

@@ -32,7 +32,7 @@
     "iscltlog": false,
     "brandgoods": false,
     "pricenumber":true,
-    "udptaskid": "5fceec1c92b4ee1025b7d091",
+    "udptaskid": "607fb74f049a9923d8a4efd9",
     "udpport": "1484",
     "nextNode": [
         {

+ 1 - 1
src/jy/clear/tonumber.go

@@ -397,7 +397,7 @@ func ClearMaxAmount(data []interface{}, spidercode ...string) []interface{} {
 			
 		}
 	}
-	if value >= 500000000000 {
+	if value >= 50000000000 {
 		data[0] = float64(0)
 		//data[1] = false
 		data[1] = true

+ 96 - 33
src/jy/extract/extract.go

@@ -34,32 +34,75 @@ var (
 	Fields        = `{"title":1,"summary":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"bidstatus":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1,"href":1,"infoformat":1,"attach_text":1,"dataging":1,"review_experts":1,"purchasing":1}`
 	//Fields        = `{"title":1,"summary":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"bidstatus":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1,"href":1,"infoformat":1,"attach_text":1,"dataging":1,"new_attach_text":1,"createtime":1,"currency":1,"id":1,"company_email":1,"buyerclass":1,"tagname":1,"company_phone":1,"appid":1,"industry":1,"projectscope":1,"item":1,"s_subscopeclass":1,"matchkey":1,"jybxhref":1,"legal_person":1,"matchtype":1,"review_experts":1,"purchasing":1}`
 	Fields2 = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}`
+	/*f       = map[string]bool{
+		"T":                true,
+		"_d":               true,
+		"area":             true,
+		"channel":          true,
+		"comeintime":       true,
+		"competehref":      true,
+		"href":             true,
+		"l_np_publishtime": true,
+		"publishtime":      true,
+		"sendflag":         true,
+		"site":             true,
+		"spidercode":       true,
+		"title":            true,
+		"projectname":      true,
+	}*/
+	/*f       = map[string]bool{
+		"contentid":        true,
+		"progName":               true,
+		"updateTime":             true,
+		"url":          true,
+		"areaId":       true,
+		"areaName":      true,
+		"popTitle":             true,
+		"showTitle": true,
+		"progId":      true,
+		"catid":         true,
+		"isConcern":             true,
+		"followCount":       true,
+		"followSuggestion":            true,
+		"isBoutique":            true,
+		"canTj":            true,
+		"tenderAmountNumber":            true,
+		"tenderAmountUnit":            true,
+		"bidderAmountNumber":            true,
+		"bidderAmountUnit":            true,
+		"registrationBeginTime":            true,
+		"registrationEndTime":            true,
+		"starNum":            true,
+		"title":            true,
+		"proInvested":            true,
+		"projectname":      true,
+	}*/
 	spidercode = map[string]bool{
-		"gd_zhsggzyjyzx_jsgc_fjczbgg":true,
-		"js_szgyyqggzyjyzx_jsgc_zjfbgs":true,
-		"zj_tzsyhggzyjyzx_jsgc_kbqk":true,
-		"hb_tmsggzyjyxxw_jsgc_kbqk":true,
-		"zj_nbsyyggzyjyw_jsgc_kbqk":true,
-		"zj_zjsggzyjyzx_jyxx_kbjg":true,
-		"zj_zjzdgcjyw_ztbjglxx_kbjg":true,
-		"zj_lssggzyjyw_jsgc_kbsk":true,
-		"zj_qzslyxggzyjyzx_gggs_xkbjl":true,
-		"sc_mssggzydzjypt_jsgc_kbjl":true,
-		"sc_pzhsggzyjyfwzx_jsgc_kbylb":true,
-		"a_zgzbtbggfwpt_wasjgf_ss_kbjl":true,
-		"a_hbszbtbggfwpt_kbjl":true,
-		"a_szsjsgcjyfwzxbafzx_kbqkgs":true,
-		"a_szldzbyxgs_kbxx":true,
-		"zj_zssssxggzyjyw_gcjs_kbjggs":true,
-		"gd_szszfhjsj_kbqkgs":true,
-		"a_gjggzyjypt_gcjs_kbjl":true,
-		"a_gjggzyjypt_gcjs_kbjl_new":true,
-		"zj_tzsyhggzyjyzx_kbjggg":true,
-		"a_zgzbtbggfwpy_wasjgf_kbjl_lsbl":true,
-		"ah_czsggzyjyw_jsgc_kbjl":true,
-		"ah_czsggzyjyw_zfcg_kbxx":true,
-		"ah_whsggzyjyfww_kbxx_cgxm":true,
-		"ah_whsggzyjyfww_kbxx_gcxm":true,
+		"gd_zhsggzyjyzx_jsgc_fjczbgg":     true,
+		"js_szgyyqggzyjyzx_jsgc_zjfbgs":   true,
+		"zj_tzsyhggzyjyzx_jsgc_kbqk":      true,
+		"hb_tmsggzyjyxxw_jsgc_kbqk":       true,
+		"zj_nbsyyggzyjyw_jsgc_kbqk":       true,
+		"zj_zjsggzyjyzx_jyxx_kbjg":        true,
+		"zj_zjzdgcjyw_ztbjglxx_kbjg":      true,
+		"zj_lssggzyjyw_jsgc_kbsk":         true,
+		"zj_qzslyxggzyjyzx_gggs_xkbjl":    true,
+		"sc_mssggzydzjypt_jsgc_kbjl":      true,
+		"sc_pzhsggzyjyfwzx_jsgc_kbylb":    true,
+		"a_zgzbtbggfwpt_wasjgf_ss_kbjl":   true,
+		"a_hbszbtbggfwpt_kbjl":            true,
+		"a_szsjsgcjyfwzxbafzx_kbqkgs":     true,
+		"a_szldzbyxgs_kbxx":               true,
+		"zj_zssssxggzyjyw_gcjs_kbjggs":    true,
+		"gd_szszfhjsj_kbqkgs":             true,
+		"a_gjggzyjypt_gcjs_kbjl":          true,
+		"a_gjggzyjypt_gcjs_kbjl_new":      true,
+		"zj_tzsyhggzyjyzx_kbjggg":         true,
+		"a_zgzbtbggfwpy_wasjgf_kbjl_lsbl": true,
+		"ah_czsggzyjyw_jsgc_kbjl":         true,
+		"ah_czsggzyjyw_zfcg_kbxx":         true,
+		"ah_whsggzyjyfww_kbxx_cgxm":       true,
+		"ah_whsggzyjyfww_kbxx_gcxm":       true,
 	}
 )
 
@@ -1842,6 +1885,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		//重新取出清理过后的中标候选人
 		resetWinnerorder(j)
 		doc, result, _id := funcAnalysis(j, e)
+		//_, result, _id := funcAnalysis(j, e)
 		if ju.IsSaveTag {
 			go otherNeedSave(j, result, e)
 		}
@@ -2117,7 +2161,16 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			}
 		}
 		tmp["dataging"] = j.Dataging
-
+		/*for k, v := range *j.Data {
+			if f[k] {
+				tmp[k] = v
+			}
+		}
+		for k := range tmp {
+			if !f[k]{
+				delete(tmp,k)
+			}
+		}*/
 		//检查字段
 		tmp = checkFields(tmp)
 		if tmp["projectname"] == nil || tmp["projectname"] == "" {
@@ -2235,13 +2288,23 @@ func checkFields(tmp map[string]interface{}) map[string]interface{} {
 		}
 	}
 	//budget bidamount
-	if bg, ok := tmp["budget"].(float64); ok && bg >= 500000000000 {
-		tmp["big_budget_err"] = bg
-		delete(tmp, "budget")
-	}
-	if bg, ok := tmp["bidamount"].(float64); ok && bg >= 500000000000 {
-		tmp["big_bidamount_err"] = bg
-		delete(tmp, "bidamount")
+	if bg, ok := tmp["budget"].(float64); ok {
+		if bg >= 50000000000 {
+			tmp["budget_max_err"] = bg
+			delete(tmp, "budget")
+		} /*else if bg > 0 && bg < 1000 {
+			tmp["budget_min_err"] = bg
+			delete(tmp, "budget")
+		}*/
+	}
+	if bg, ok := tmp["bidamount"].(float64); ok && bg >= 50000000000 {
+		if bg >= 50000000000 {
+			tmp["bidamount_max_err"] = bg
+			delete(tmp, "bidamount")
+		} /*else if bg > 0 && bg < 1000 {
+			tmp["bidamount_min_err"] = bg
+			delete(tmp, "bidamount")
+		}*/
 	}
 	return tmp
 }

+ 12 - 12
src/jy/extract/extractInit.go

@@ -1211,18 +1211,18 @@ func (e *ExtractTask) initDistricts(jc_province string, qc_city string, c *City,
 				e.NewStreetDistrictMap[strvtown] = append(e.NewStreetDistrictMap[strvtown], d)
 			}
 			//村、居委会
-			// jwhs := jwhs_maps[jc_province][qc_city][qc_district][strvtown]
-			// for _, vjwh := range jwhs {
-			// 	strvillage := qu.ObjToString(vjwh["village"])
-			// 	e.Trie_Full_Community.AddWords(strvillage) //加入居委会、村全称Trie
-			// 	cttmp := e.CommunityDistrictMap[strvillage]
-			// 	if len(cttmp) == 0 {
-			// 		tmpdarr := []*District{d}
-			// 		e.CommunityDistrictMap[strvillage] = tmpdarr
-			// 	} else {
-			// 		e.CommunityDistrictMap[strvillage] = append(e.CommunityDistrictMap[strvillage], d)
-			// 	}
-			// }
+			//jwhs := jwhs_maps[jc_province][qc_city][qc_district][strvtown]
+			//for _, vjwh := range jwhs {
+			//	strvillage := qu.ObjToString(vjwh["village"])
+			//	e.Trie_Full_Community.AddWords(strvillage) //加入居委会、村全称Trie
+			//	cttmp := e.CommunityDistrictMap[strvillage]
+			//	if len(cttmp) == 0 {
+			//		tmpdarr := []*District{d}
+			//		e.CommunityDistrictMap[strvillage] = tmpdarr
+			//	} else {
+			//		e.CommunityDistrictMap[strvillage] = append(e.CommunityDistrictMap[strvillage], d)
+			//	}
+			//}
 		}
 
 	}

+ 2 - 2
src/jy/pretreated/analytable.go

@@ -185,7 +185,7 @@ func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}, isSite bool,
 		}
 		res[0].IsInvalid = true
 		//k1 = res[0].Value
-	} else {
+	} /*else {
 		kvTags[k] = append(kvTags[k], &u.Tag{Key: k, Value: v1, IsInvalid: true})
 		//没有取到标准化key时,对中标金额和中标单位的逻辑处理
 		if filter_zbje_k.MatchString(k) && !filter_zbje_kn.MatchString(k) && filter_zbje_v.MatchString(v1) && utf8.RuneCountInString(v1) < 20 {
@@ -213,7 +213,7 @@ func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}, isSite bool,
 				}
 			}
 		}
-	}
+	}*/
 	return
 }
 

+ 14 - 14
standardata/src/config.json

@@ -1,23 +1,23 @@
 {
   "mgofrom": "192.168.3.207:27092",
   "mgofromsize":5,
-  "mgofromdb":"mxs",
-  "mgoto": "192.168.3.207:27092",
+  "mgofromdb":"qfw",
+  "mgoto": "192.168.3.205:27082,192.168.3.205:27083",
   "mgotosize":5,
-  "mgotodb":"mxs",
-  "mgoent": "192.168.3.207:27092",
+  "mgotodb":"mixdata",
+  "mgoent": "192.168.3.205:27082,192.168.3.205:27083",
   "mgoentsize":5,
-  "mgoentdb":"mxs",
-  "extractcoll":"extract",
+  "mgoentdb":"mixdata",
+  "extractcoll":"bidding",
   "extractdb": "extract_v3xs",
   "versioncoll": "tagdetailinfo",
   "extractversion": "V3.1.2",
   "standardata":{
-	"winner":{
-		"standarent":"winner_enterprisenew",
-		"standarerr":"winner_errnew",
-		"redisdb":1
-	},
+  "winner":{
+    "standarent":"winner_enterprise",
+    "standarerr":"winner_err",
+    "redisdb":1
+  },
     "buyer":{
       "standarent":"buyer_enterprise",
       "standarerr":"buyer_err",
@@ -29,7 +29,7 @@
       "redisdb":3
     }
   },
-  "redis": "winner=172.17.148.44:2679,buyer=172.17.148.44:2679,agency=172.17.148.44:2679",
-  "username": "",
-  "password": ""
+  "redis": "winner=127.0.0.1:6379,buyer=127.0.0.1:6379,agency=127.0.0.1:6379",
+  "username": "dataAnyWrite",
+  "password": "data@dataAnyWrite"
 }

+ 3 - 3
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "zhengkun",
-        "extract": "all_01_02_fusiontest",
-        "extract_back": "all_01_02_fusiontest",
+        "extract": "repeat_test",
+        "extract_back": "repeat_test",
         "site": {
             "dbname": "zhengkun",
             "coll": "site"
@@ -15,7 +15,7 @@
     "task_mongodb": {
         "task_addrName": "192.168.3.207:27092",
         "task_dbName": "zhengkun",
-        "task_collName": "zk_data",
+        "task_collName": "repeat_test",
         "pool": 10
     },
     "jkmail": {

+ 5 - 5
udpfilterdup/src/main.go

@@ -59,7 +59,6 @@ var (
 
 
 func init() {
-	
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
 	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
@@ -139,7 +138,10 @@ func init() {
 }
 
 
-func main() {
+func mainT() {
+	//exportFenLeiData()
+	//return
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -173,9 +175,7 @@ func main() {
 }
 
 //测试组人员使用
-func mainT() {
-
-
+func main() {
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)

+ 2 - 2
udps/main.go

@@ -22,8 +22,8 @@ func main() {
 	flag.IntVar(&p, "p", 1484, "端口")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
-	flag.StringVar(&id1, "gtid", "", "gtid")
-	flag.StringVar(&id2, "lteid", "", "lteid")
+	flag.StringVar(&id1, "gtid", "6075ea3162ad7d3e568c7590", "gtid")
+	flag.StringVar(&id2, "lteid", "6076546d27cdc4cf2bc60fde", "lteid")
 	flag.StringVar(&ids, "ids", "", "id1,id2")
 	flag.StringVar(&stype, "stype", "biddingall", "stype,传递类型")
 	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")