Procházet zdrojové kódy

Merge branch 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4.1

fengweiqiang před 4 roky
rodič
revize
f4dbbbc7ae
38 změnil soubory, kde provedl 2810 přidání a 276 odebrání
  1. 20 0
      data_monitoring/listen_lan/src/config.json
  2. 43 0
      data_monitoring/listen_lan/src/errmail.go
  3. 145 0
      data_monitoring/listen_lan/src/main.go
  4. 329 0
      data_monitoring/listen_lan/src/mgo.go
  5. 48 0
      data_monitoring/listen_lan/src/sitelisten.go
  6. 28 0
      data_monitoring/listen_online/src/config.json
  7. 28 0
      data_monitoring/listen_online/src/errmail.go
  8. 116 0
      data_monitoring/listen_online/src/main.go
  9. 329 0
      data_monitoring/listen_online/src/mgo.go
  10. 58 0
      data_monitoring/listen_online/src/priselisten.go
  11. 166 0
      data_monitoring/listen_online/src/sitelisten.go
  12. 37 11
      data_monitoring/listen_task/src/config.json
  13. 24 34
      data_monitoring/listen_task/src/dataTaskJP.go
  14. 32 32
      data_monitoring/listen_task/src/dataTaskQY.go
  15. 166 0
      data_monitoring/listen_task/src/dataTaskST.go
  16. 173 20
      data_monitoring/listen_task/src/main.go
  17. 40 0
      data_monitoring/listen_task/src/mark
  18. 31 18
      data_monitoring/listen_task/src/sendmail.go
  19. 5 3
      data_monitoring/vps_client/src/config.json
  20. 42 55
      data_monitoring/vps_client/src/main.go
  21. 9 3
      data_monitoring/vps_server/src/config.json
  22. 61 13
      data_monitoring/vps_server/src/main.go
  23. 49 0
      data_monitoring/vps_server/src/mark
  24. 81 0
      data_monitoring/words_vaild/src/main.go
  25. 304 0
      data_monitoring/words_vaild/src1/main.go
  26. 328 0
      data_monitoring/words_vaild/src1/mgo.go
  27. 1 2
      fullproject/src_v1/load_data.go
  28. 2 2
      fullproject/src_v1/main.go
  29. 19 10
      fullproject/src_v1/project.go
  30. 26 22
      fullproject/src_v1/task.go
  31. 9 9
      monitor/task.go
  32. 2 2
      qyxy/src/config.json
  33. 4 4
      qyxy/src/main.go
  34. 33 29
      qyxy/src/task.go
  35. 7 0
      udpcreateindex/src/biddingall.go
  36. 8 0
      udpcreateindex/src/biddingindex.go
  37. 4 4
      udpcreateindex/src/config.json
  38. 3 3
      udpcreateindex/src/main.go

+ 20 - 0
data_monitoring/listen_lan/src/config.json

@@ -0,0 +1,20 @@
+{
+  "data_mgodb": {
+    "addr": "baibai.ink:28082",
+    "db": "spider_data",
+    "coll": "data_bak",
+    "pool": 5
+  },
+  "save_other_name": "monitor_other",
+  "dynamic_coll_name": "z_site_spider",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhengkun@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "统计.xlsx"
+}

+ 43 - 0
data_monitoring/listen_lan/src/errmail.go

@@ -0,0 +1,43 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	qu "qfw/util"
+	"qfw/util/mail"
+)
+
+var from,to,cc, smtpHost,user,pwd string
+var smtpPort int
+
+
+func sendErrMailSmtp(title,body string) {
+
+
+	smtpMail, _ := sysconfig["smtpMail"].(map[string]interface{})
+
+	if smtpMail != nil {
+		from, _ = smtpMail["from"].(string)
+		to, _ = smtpMail["to"].(string)
+		cc, _ = smtpMail["cc"].(string)
+		smtpHost, _ = smtpMail["smtpHost"].(string)
+		smtpPort= qu.IntAll(smtpMail["smtpPort"])
+		user, _ = smtpMail["user"].(string)
+		pwd, _ = smtpMail["pwd"].(string)
+	}
+	f, _ := os.Open(xlsx_name)
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		fmt.Println("err:",err)
+		return
+	}
+
+	ok := mail.GSendMail_B(user, from, cc, from, title, body, f.Name(), b, &mail.GmailAuth{
+		SmtpHost: smtpHost,
+		SmtpPort: smtpPort,
+		User:     user,
+		Pwd:      pwd,
+	})
+	fmt.Println(ok)
+}

+ 145 - 0
data_monitoring/listen_lan/src/main.go

@@ -0,0 +1,145 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"github.com/tealeg/xlsx"
+	"log"
+	"os"
+	qu "qfw/util"
+	"time"
+)
+
+
+var (
+	sysconfig    							map[string]interface{} //配置文件
+	data_mgo						      	*MongodbSim     //mongodb操作对象
+	save_other_name,data_coll_name			string
+	xlsx_name,dynamic_coll_name				string
+	infoSiteArr								[]string
+)
+
+func initMgo()  {
+
+	dconf := sysconfig["data_mgodb"].(map[string]interface{})
+	data_coll_name = qu.ObjToString(dconf["coll"])
+	data_mgo = &MongodbSim{
+		MongodbAddr: dconf["addr"].(string),
+		DbName:      dconf["db"].(string),
+		Size:        qu.IntAllDef(dconf["pool"], 5),
+	}
+	data_mgo.InitPool()
+
+
+
+	//属性赋值
+	save_other_name = qu.ObjToString(sysconfig["save_other_name"])
+	dynamic_coll_name = qu.ObjToString(sysconfig["dynamic_coll_name"])
+	xlsx_name = qu.ObjToString(sysconfig["xlsx_name"])
+}
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+}
+
+func main() {
+	go taskTime()
+
+	time.Sleep(99999 * time.Hour)
+
+}
+
+func taskTime()  {
+
+	log.Println("部署定时任务")
+	c := cron.New()
+	//企业变更-站点-mongo
+	c.AddFunc("0 0 9 ? * *", func() { dealWithOtherData() })
+
+	c.Start()
+
+}
+
+func dealWithOtherData()  {
+
+	log.Println("开始统计相关数据...")
+
+	//先获取-待统计-动态站点
+	tmp ,tmp_err := data_mgo.Find(dynamic_coll_name, map[string]interface{}{},nil,nil)
+	if tmp_err==nil && tmp!=nil && len(tmp)>0 {
+		infoSiteArr = make([]string,0)
+		for _,v :=range tmp {
+			infoSiteArr = append(infoSiteArr,qu.ObjToString(v["site"]))
+		}
+		log.Println("目标站点:",len(infoSiteArr))
+	}
+	start := int(time.Now().Unix())
+	spider_data :=dealWithSiteSpiderData()//站点数据
+	comeintime :=qu.Int64All(time.Now().Unix())
+
+	data_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"num":qu.IntAll(spider_data["num"]),
+		"comeintime":comeintime,
+		"date":qu.ObjToString(spider_data["date"]),
+		"data":spider_data["data"],
+	})
+
+	//获取前两天数据
+	now:=time.Now() //当前天
+	cur_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	before_time := cur_time-86400*2
+	before_date :=qu.FormatDateByInt64(&before_time, qu.DATEFORMAT)
+
+
+	//站点数据
+	before_st_data :=data_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"date":before_date,
+	})
+
+
+	os.Remove("统计.xlsx")	//写excle
+	f :=xlsx.NewFile()
+	sheet, _ := f.AddSheet("统计")
+
+	//企业数据
+	row := sheet.AddRow()
+	row.AddCell().Value = "站点分类/日期"
+	row.AddCell().SetString(qu.ObjToString(spider_data["date"]))
+	if len(before_st_data)>0 {
+		row.AddCell().SetString(qu.ObjToString(before_st_data["date"]))
+	}
+
+
+
+	data_1 := *qu.ObjToMap(spider_data["data"])
+	data_2 := *qu.ObjToMap(before_st_data["data"])
+
+	for _,key := range infoSiteArr {
+		row = sheet.AddRow()
+		row.AddCell().SetString(key)
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_1[key])))
+		if len(before_st_data)>0 {
+			row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_2[key])))
+		}
+	}
+	row = sheet.AddRow()
+	row.AddCell().Value = "总计"
+	row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(spider_data["num"])))
+	if len(before_st_data)>0 {
+		row.AddCell().SetString(fmt.Sprintf("%d", qu.Int64All(before_st_data["num"])))
+	}
+	err := f.Save(xlsx_name)
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+	sendErrMailSmtp("统计-站点","附件")
+
+	log.Println("定时处理完毕...",int(time.Now().Unix())-start,"秒")
+}

+ 329 - 0
data_monitoring/listen_lan/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 48 - 0
data_monitoring/listen_lan/src/sitelisten.go

@@ -0,0 +1,48 @@
+package main
+
+import (
+	qu "qfw/util"
+	"time"
+)
+
+
+func dealWithSiteSpiderData()(map[string]interface{}) {
+
+	defer qu.Catch()
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	//细节才需要遍历
+	it := sess.DB(data_mgo.DbName).C(data_coll_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total,dict:=0,make(map[string]interface{},0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+
+	return map[string]interface{}{
+		"num":total,
+		"data":dict,
+		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
+	}
+}
+

+ 28 - 0
data_monitoring/listen_online/src/config.json

@@ -0,0 +1,28 @@
+{
+  "save_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "editor",
+    "coll": "monitor_other",
+    "pool": 5
+  },
+  "qy_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "spider",
+    "coll": "baidu_enterprise",
+    "pool": 5
+  },
+  "st_mgodb": {
+    "addr": "172.17.4.85:27080",
+    "db": "qfw",
+    "coll": "result_20210109",
+    "pool": 5
+  },
+  "es_index_st": "bidding",
+  "es_type_st": "bidding",
+  "save_other_name": "monitor_other",
+  "save_site_name" : "monitor_site",
+  "jkmail": {
+    "to": "zhengkun@topnet.net.cn",
+    "api": "http://172.17.145.179:19281/_send/_mail"
+  }
+}

+ 28 - 0
data_monitoring/listen_online/src/errmail.go

@@ -0,0 +1,28 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+)
+var tomail string
+var api string
+
+//api模式 二选一皆可
+func sendErrMailApi(title,body string)  {
+	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail,api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	}else {
+		log.Println("邮件发送失败:", err)
+	}
+}

+ 116 - 0
data_monitoring/listen_online/src/main.go

@@ -0,0 +1,116 @@
+package main
+
+import (
+	"fmt"
+	"github.com/cron"
+	"log"
+	qu "qfw/util"
+	"time"
+)
+
+
+var (
+	sysconfig    							map[string]interface{} //配置文件
+	save_mgo,qy_mgo,st_mgo        			*MongodbSim            //mongodb操作对象
+	qy_c_name,st_c_name						string
+	es_index_st,es_type_st					string
+	save_other_name,save_site_name			string
+	xlsx_name								string
+)
+
+func initMgo()  {
+
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+	qconf := sysconfig["qy_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(qconf["coll"])
+	qy_mgo = &MongodbSim{
+		MongodbAddr: qconf["addr"].(string),
+		DbName:      qconf["db"].(string),
+		Size:        qu.IntAllDef(qconf["pool"], 5),
+	}
+	qy_mgo.InitPool()
+
+	sconf := sysconfig["st_mgodb"].(map[string]interface{})
+	st_c_name = qu.ObjToString(sconf["coll"])
+	st_mgo = &MongodbSim{
+		MongodbAddr: sconf["addr"].(string),
+		DbName:      sconf["db"].(string),
+		Size:        qu.IntAllDef(sconf["pool"], 5),
+	}
+	st_mgo.InitPool()
+
+
+	//属性赋值
+	es_index_st = qu.ObjToString(sysconfig["es_index_st"])
+	es_type_st = qu.ObjToString(sysconfig["es_type_st"])
+	save_other_name = qu.ObjToString(sysconfig["save_other_name"])
+	save_site_name = qu.ObjToString(sysconfig["save_site_name"])
+}
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+}
+
+func main() {
+	go taskTime()
+
+	time.Sleep(99999 * time.Hour)
+
+}
+
+func taskTime()  {
+
+	log.Println("部署定时任务")
+	c := cron.New()
+	//站点相关全量-es-mongo
+	c.AddFunc("0 30 8 ? * *", func() { dealWithSTData() })
+	//企业变更-mongo
+	c.AddFunc("0 50 8 ? * *", func() { dealWithOtherData() })
+	c.Start()
+}
+
+func dealWithOtherData()  {
+
+	log.Println("开始统计相关数据...")
+
+	start := int(time.Now().Unix())
+	qy_date,qy_num := dealWithQYData()
+	log.Println(qy_date,qy_num)
+
+	comeintime :=qu.Int64All(time.Now().Unix())
+	log.Println(comeintime)
+	save_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"num":qu.IntAll(qy_num),
+		"comeintime":comeintime,
+		"date":qy_date,
+	})
+	//获取前两天数据
+	now:=time.Now() //当前天
+	cur_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	before_time := cur_time-86400*2
+	before_date :=qu.FormatDateByInt64(&before_time, qu.DATEFORMAT)
+	//企业数据
+	before_qy_data :=save_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"date":before_date,
+	})
+
+	body :=fmt.Sprintf("日期:%s-数量:%s",qy_date,qy_num)
+	if len(before_qy_data)>0 {
+		body =body+fmt.Sprintf("~~日期:%s-数量:%d",before_date,before_qy_data["num"])
+	}
+
+	sendErrMailApi("企业-变更",body)
+	log.Println("定时处理完毕...",int(time.Now().Unix())-start,"秒")
+}

+ 329 - 0
data_monitoring/listen_online/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 58 - 0
data_monitoring/listen_online/src/priselisten.go

@@ -0,0 +1,58 @@
+package main
+
+import (
+	"fmt"
+	qu "qfw/util"
+	"time"
+)
+
+
+func dealWithQYData()(string,string)  {
+
+
+	defer qu.Catch()
+
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
+	q := map[string]interface{}{
+		"down_time": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+
+	sess := qy_mgo.GetMgoConn()
+	defer qy_mgo.DestoryMongoConn(sess)
+
+	//细节才需要遍历
+	it := sess.DB(qy_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		tmp = make(map[string]interface{})
+	}
+	return qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),fmt.Sprintf("%d",total)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+ 166 - 0
data_monitoring/listen_online/src/sitelisten.go

@@ -0,0 +1,166 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"time"
+	es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
+)
+
+
+func dealWithSTData()  {
+
+	log.Println("开始统计全量站点数据...")
+	defer qu.Catch()
+
+	start := int(time.Now().Unix())
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
+	mgodata := toCalculateMgoData(gte_time,lt_time)
+	esdata := toCalculateEsData(fmt.Sprintf("%d",gte_time),fmt.Sprintf("%d",lt_time))
+	mgonum,esnum:= qu.IntAll(mgodata["totalnum"]),qu.IntAll(esdata["totalnum"])
+
+	comeintime:=qu.Int64All(time.Now().Unix())
+	date := qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT)
+	save_mgo.Save(save_site_name, map[string]interface{}{
+		"comeintime":comeintime,
+		"date":date,
+		"name":"全量站点",
+		"mgonum":mgonum,
+		"esnum":esnum,
+		"mgodata":mgodata["detail"],
+		"esdata":esdata["detail"],
+	})
+
+	log.Println("站点-定时处理完毕...","用时:",int(time.Now().Unix())-start,"秒")
+}
+
+
+
+
+
+
+
+
+
+
+
+
+//es数据统计
+func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
+
+	if gte_time == "" || lt_time == "" {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	dict:= make(map[string]interface{},0)
+	total :=0
+
+	elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
+	//elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
+	esclient := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(esclient)
+	if esclient == nil {
+		log.Println("连接池异常")
+	}
+	query :=es_elastic.NewRangeQuery("comeintime").Gte(gte_time).Lt(lt_time)
+	cursor, err := esclient.Scan(es_index_st).Query(es_elastic.NewBoolQuery().Must(query)).
+		Size(200).Do()
+	if err != nil {
+		log.Println("cursor",err)
+	}
+	if cursor.Results == nil {
+		log.Println("results != nil; got nil")
+	}
+	if cursor.Results.Hits == nil {
+		log.Println("expected results.Hits != nil; got nil")
+	}
+	//log.Println("es total :",cursor.TotalHits(),"处理分析中......")
+	for {
+		searchResult, err := cursor.Next()
+		if err != nil {
+			if err.Error() == "EOS" {
+				break
+			}else {
+				log.Println("cursor searchResult",err)
+			}
+		}
+		for _, hit := range searchResult.Hits.Hits {
+			tmp := make(map[string]interface{})
+			err := json.Unmarshal(*hit.Source, &tmp)
+			if err != nil {
+				log.Println("json Unmarshal error")
+				continue
+			}
+			total++
+			site := qu.ObjToString(tmp["site"])	//统计站点
+			if dict[site] == nil {
+				dict[site] = 1
+			}else {
+				num := qu.IntAll(dict[site])+1
+				dict[site] = num
+			}
+		}
+	}
+
+
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}
+
+
+
+
+//mongo数据统计
+func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
+
+	if gte_time == 0 || lt_time == 0 {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+
+	sess := st_mgo.GetMgoConn()
+	defer st_mgo.DestoryMongoConn(sess)
+
+	dict:= make(map[string]interface{},0)
+	//细节才需要遍历
+	it := sess.DB(st_mgo.DbName).C(st_c_name).Find(&query).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		//统计站点
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
+		}
+
+		tmp = make(map[string]interface{})
+	}
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}

+ 37 - 11
data_monitoring/listen_task/src/config.json

@@ -1,14 +1,40 @@
 {
-  "mongodb": {
-    "addrName": "192.168.3.207:27092",
-    "dbName": "zhengkun",
-    "collName": "baidu_enterprise",
-    "pool": 10
-  },
-  "jp_collname": "baidu_enterprise",
-  "qy_collname": "baidu_enterprise",
-  "jkmail": {
+  "save_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "monitor_other",
+    "pool": 5
+  },
+  "qy_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "baidu_enterprise_test",
+    "pool": 5
+  },
+  "jp_mgodb": {
+    "addr": "baibai.ink:28082",
+    "db": "spider_data",
+    "coll": "data_bak",
+    "pool": 5
+  },
+  "st_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "result_20210109",
+    "pool": 5
+  },
+  "es_index_st": "bidding",
+  "es_type_st": "bidding",
+  "save_other_name": "monitor_other",
+  "save_site_name" : "monitor_site",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
     "to": "zhengkun@topnet.net.cn",
-    "api": "http://172.17.145.179:19281/_send/_mail"
-  }
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "统计.xlsx"
 }

+ 24 - 34
data_monitoring/listen_task/src/dataTaskJP.go

@@ -1,58 +1,48 @@
 package main
 
 import (
-	"log"
 	qu "qfw/util"
 	"time"
 )
 
 
-func dealWithJPData()  {
+func dealWithJPData()(map[string]interface{}) {
 
-	log.Println("开始竞品数据统计...")
 	defer qu.Catch()
-
 	now:=time.Now() //当前天
-	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
 	q := map[string]interface{}{
-		"down_time": map[string]interface{}{
+		"comeintime": map[string]interface{}{
 			"$gte":  gte_time,
 			"$lt": lt_time,
 		},
 	}
-	log.Println("查询条件",q)
-
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-
+	sess := jp_mgo.GetMgoConn()
+	defer jp_mgo.DestoryMongoConn(sess)
 	//细节才需要遍历
-	it := sess.DB(mgo.DbName).C(jp_collname).Find(&q).Iter()
-	total,start :=0, int(time.Now().Unix())
+	it := sess.DB(jp_mgo.DbName).C(jp_c_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total,dict:=0,make(map[string]interface{},0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
-		if total%10000==0 {
-			log.Println("current index",total)
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
 		}
-		//删选条件
-
 		tmp = make(map[string]interface{})
 	}
-	log.Println("jp is over:",total,"总用时:",int(time.Now().Unix())-start,"秒")
 
 
-	//是否告警条件
-	if total<1 {
-		sendErrMailApi("竞品数据异常","数量无")
+
+	return map[string]interface{}{
+		"num":total,
+		"data":dict,
+		"date":qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),
 	}
-	comeintime:=qu.Int64All(time.Now().Unix())
-	date := qu.FormatDateByInt64(&comeintime, qu.Date_yyyyMMdd)
-	mgo.Save("monitor_site", map[string]interface{}{
-		"name":"竞品",
-		"type":"竞品",
-		"num":qu.IntAll(total),
-		"comeintime":comeintime,
-		"updatedate":date,
-		"data":"",
-	})
-
-}
+}
+

+ 32 - 32
data_monitoring/listen_task/src/dataTaskQY.go

@@ -1,58 +1,58 @@
 package main
 
 import (
-	"log"
+	"fmt"
 	qu "qfw/util"
 	"time"
 )
 
 
-func dealWithQYData()  {
+func dealWithQYData()(string,string)  {
+
 
-	log.Println("开始企业数据统计...")
 	defer qu.Catch()
 
 	now:=time.Now() //当前天
-	gte_time:= time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
-	lt_time := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
 	q := map[string]interface{}{
 		"down_time": map[string]interface{}{
 			"$gte":  gte_time,
 			"$lt": lt_time,
 		},
 	}
-	log.Println("查询条件",q)
 
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := qy_mgo.GetMgoConn()
+	defer qy_mgo.DestoryMongoConn(sess)
 
 	//细节才需要遍历
-	it := sess.DB(mgo.DbName).C(qy_collname).Find(&q).Iter()
-	total,start :=0, int(time.Now().Unix())
+	it := sess.DB(qy_mgo.DbName).C(qy_c_name).Find(&q).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total:=0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
-		if total%10000==0 {
-			log.Println("current index",total)
-		}
-		//删选条件
-
 		tmp = make(map[string]interface{})
 	}
-	log.Println("qy is over:",total,"总用时:",int(time.Now().Unix())-start,"秒")
+	return qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT),fmt.Sprintf("%d",total)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
 
 
-	//是否告警条件
-	if total<1 {
-		sendErrMailApi("企业数据异常","数量无")
-	}
-	comeintime:=qu.Int64All(time.Now().Unix())
-	date := qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT)
-	mgo.Save("monitor_site", map[string]interface{}{
-		"name":"企业变更",
-		"type":"企业变更",
-		"num":qu.IntAll(total),
-		"comeintime":comeintime,
-		"updatedate":date,
-		"data":"",
-	})
-
-}

+ 166 - 0
data_monitoring/listen_task/src/dataTaskST.go

@@ -0,0 +1,166 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"time"
+	es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
+)
+
+
+func dealWithSTData()  {
+
+	log.Println("开始统计全量站点数据...")
+	defer qu.Catch()
+
+	start := int(time.Now().Unix())
+	now:=time.Now() //当前天
+	lt_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gte_time := lt_time-86400
+
+	mgodata := toCalculateMgoData(gte_time,lt_time)
+	esdata := toCalculateEsData(fmt.Sprintf("%d",gte_time),fmt.Sprintf("%d",lt_time))
+	mgonum,esnum:= qu.IntAll(mgodata["totalnum"]),qu.IntAll(esdata["totalnum"])
+
+	comeintime:=qu.Int64All(time.Now().Unix())
+	date := qu.FormatDateByInt64(&gte_time, qu.DATEFORMAT)
+	save_mgo.Save("monitor_site", map[string]interface{}{
+		"comeintime":comeintime,
+		"date":date,
+		"name":"全量站点",
+		"mgonum":mgonum,
+		"esnum":esnum,
+		"mgodata":mgodata["detail"],
+		"esdata":esdata["detail"],
+	})
+
+	log.Println("站点-定时处理完毕...","用时:",int(time.Now().Unix())-start,"秒")
+}
+
+
+
+
+
+
+
+
+
+
+
+
+//es数据统计
+func toCalculateEsData(gte_time string,lt_time string) map[string]interface{} {
+
+	if gte_time == "" || lt_time == "" {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	dict:= make(map[string]interface{},0)
+	total :=0
+
+	elastic.InitElasticSize("http://172.17.145.170:9800", 10,)
+	//elastic.InitElasticSize("http://127.0.0.1:12003", 10,)
+	esclient := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(esclient)
+	if esclient == nil {
+		log.Println("连接池异常")
+	}
+	query :=es_elastic.NewRangeQuery("comeintime").Gte(gte_time).Lt(lt_time)
+	cursor, err := esclient.Scan(es_index_st).Query(es_elastic.NewBoolQuery().Must(query)).
+		Size(200).Do()
+	if err != nil {
+		log.Println("cursor",err)
+	}
+	if cursor.Results == nil {
+		log.Println("results != nil; got nil")
+	}
+	if cursor.Results.Hits == nil {
+		log.Println("expected results.Hits != nil; got nil")
+	}
+	//log.Println("es total :",cursor.TotalHits(),"处理分析中......")
+	for {
+		searchResult, err := cursor.Next()
+		if err != nil {
+			if err.Error() == "EOS" {
+				break
+			}else {
+				log.Println("cursor searchResult",err)
+			}
+		}
+		for _, hit := range searchResult.Hits.Hits {
+			tmp := make(map[string]interface{})
+			err := json.Unmarshal(*hit.Source, &tmp)
+			if err != nil {
+				log.Println("json Unmarshal error")
+				continue
+			}
+			total++
+			site := qu.ObjToString(tmp["site"])	//统计站点
+			if dict[site] == nil {
+				dict[site] = 1
+			}else {
+				num := qu.IntAll(dict[site])+1
+				dict[site] = num
+			}
+		}
+	}
+
+
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}
+
+
+
+
+//mongo数据统计
+func toCalculateMgoData(gte_time int64,lt_time int64) map[string]interface{} {
+
+	if gte_time == 0 || lt_time == 0 {
+		return map[string]interface{}{
+			"totalnum" : 0,
+			"detail": map[string]interface{}{},
+		}
+	}
+
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte":  gte_time,
+			"$lt": lt_time,
+		},
+	}
+
+	sess := st_mgo.GetMgoConn()
+	defer st_mgo.DestoryMongoConn(sess)
+
+	dict:= make(map[string]interface{},0)
+	//细节才需要遍历
+	it := sess.DB(st_mgo.DbName).C(st_c_name).Find(&query).Select(map[string]interface{}{
+		"site":1,
+	}).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		//统计站点
+		site := qu.ObjToString(tmp["site"])
+		if dict[site] == nil {
+			dict[site] = 1
+		}else {
+			num := qu.IntAll(dict[site])+1
+			dict[site] = num
+		}
+
+		tmp = make(map[string]interface{})
+	}
+	return map[string]interface{}{
+		"totalnum" : total,
+		"detail": dict,
+	}
+}

+ 173 - 20
data_monitoring/listen_task/src/main.go

@@ -1,36 +1,74 @@
 package main
 
 import (
+	"fmt"
 	"github.com/cron"
+	"github.com/tealeg/xlsx"
 	"log"
+	"os"
 	qu "qfw/util"
 	"time"
 )
 
 
 var (
-	sysconfig    	map[string]interface{} //配置文件
-	mgo          	*MongodbSim            //mongodb操作对象
-	jp_collname		string
-	qy_collname		string
+	sysconfig    							map[string]interface{} //配置文件
+	save_mgo,qy_mgo,jp_mgo,st_mgo        	*MongodbSim            //mongodb操作对象
+	qy_c_name,jp_c_name,st_c_name			string
+	es_index_st,es_type_st					string
+	save_other_name,save_site_name			string
+	xlsx_name								string
+	infoSiteArr								[]string
 )
 
 func initMgo()  {
-	mconf := sysconfig["mongodb"].(map[string]interface{})
-	log.Println(mconf)
-	mgo = &MongodbSim{
-		MongodbAddr: mconf["addrName"].(string),
-		DbName:      mconf["dbName"].(string),
-		Size:        qu.IntAllDef(mconf["pool"], 10),
+
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+	qconf := sysconfig["qy_mgodb"].(map[string]interface{})
+	qy_c_name = qu.ObjToString(qconf["coll"])
+	qy_mgo = &MongodbSim{
+		MongodbAddr: qconf["addr"].(string),
+		DbName:      qconf["db"].(string),
+		Size:        qu.IntAllDef(qconf["pool"], 5),
+	}
+	qy_mgo.InitPool()
+
+	jconf := sysconfig["jp_mgodb"].(map[string]interface{})
+	jp_c_name = qu.ObjToString(jconf["coll"])
+	jp_mgo = &MongodbSim{
+		MongodbAddr: jconf["addr"].(string),
+		DbName:      jconf["db"].(string),
+		Size:        qu.IntAllDef(jconf["pool"], 5),
+	}
+	jp_mgo.InitPool()
+
+	sconf := sysconfig["st_mgodb"].(map[string]interface{})
+	st_c_name = qu.ObjToString(sconf["coll"])
+	st_mgo = &MongodbSim{
+		MongodbAddr: sconf["addr"].(string),
+		DbName:      sconf["db"].(string),
+		Size:        qu.IntAllDef(sconf["pool"], 5),
 	}
-	mgo.InitPool()
+	st_mgo.InitPool()
 
 
 	//属性赋值
-	jp_collname = qu.ObjToString(sysconfig["jp_collname"])
-	qy_collname = qu.ObjToString(sysconfig["qy_collname"])
-}
+	es_index_st = qu.ObjToString(sysconfig["es_index_st"])
+	es_type_st = qu.ObjToString(sysconfig["es_type_st"])
+	save_other_name = qu.ObjToString(sysconfig["save_other_name"])
+	save_site_name = qu.ObjToString(sysconfig["save_site_name"])
+
 
+	xlsx_name = qu.ObjToString(sysconfig["xlsx_name"])
+}
 
 func init() {
 	//加载配置文件
@@ -40,23 +78,138 @@ func init() {
 
 func main() {
 	go taskTime()
+
 	time.Sleep(99999 * time.Hour)
+
 }
 
 func taskTime()  {
 
 	log.Println("部署定时任务")
 	c := cron.New()
-	//竞品
-	c.AddFunc("0 30 8 ? * *", func() { dealWithJPData() })
-	//企业变更
-	c.AddFunc("0 30 8 ? * *", func() { dealWithQYData() })
 
-	c.Start()
+	//站点相关全量-es-mongo
+	c.AddFunc("0 30 8 ? * *", func() { dealWithSTData() })
 
 
-	dealWithJPData()
+	//企业变更-站点-mongo
+	c.AddFunc("0 0 9 ? * *", func() { dealWithOtherData() })
+
+	c.Start()
 
 }
 
+func dealWithOtherData()  {
+
+	log.Println("开始统计相关数据...")
+
+	//先获取-待统计-动态站点
+	tmp ,tmp_err := save_mgo.Find("z_site_spider", map[string]interface{}{},nil,nil)
+	if tmp_err==nil && tmp!=nil && len(tmp)>0 {
+		infoSiteArr = make([]string,0)
+		for _,v :=range tmp {
+			infoSiteArr = append(infoSiteArr,qu.ObjToString(v["site"]))
+		}
+		log.Println("目标站点:",len(infoSiteArr))
+	}
+
+	start := int(time.Now().Unix())
+	st_data :=dealWithJPData()//站点数据
+	qy_date,qy_num := dealWithQYData()//企业数据
+	comeintime :=qu.Int64All(time.Now().Unix())
+	save_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"num":qu.IntAll(qy_num),
+		"comeintime":comeintime,
+		"date":qy_date,
+	})
+
+	save_mgo.Save(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"num":qu.IntAll(st_data["num"]),
+		"comeintime":comeintime,
+		"date":qu.ObjToString(st_data["date"]),
+		"data":st_data["data"],
+	})
+
+
+
+	//获取前两天数据
+	qy_arr := make([]map[string]string,0)
+	now:=time.Now() //当前天
+	cur_time:= time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	before_time := cur_time-86400*2
+	before_date :=qu.FormatDateByInt64(&before_time, qu.DATEFORMAT)
+	//企业数据
+	before_qy_data :=save_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"企业",
+		"date":before_date,
+	})
+	qy_arr = append(qy_arr, map[string]string{
+		"date":qy_date,
+		"num":qy_num,
+	})
+	if before_qy_data!=nil && len(before_qy_data)>2 {
+		qy_arr = append(qy_arr, map[string]string{
+			"date":qu.ObjToString(before_qy_data["date"]),
+			"num":fmt.Sprintf("%d",before_qy_data["num"]),
+		})
+	}
+
+
+	//站点数据
+	before_st_data :=save_mgo.FindOne(save_other_name, map[string]interface{}{
+		"name":"站点",
+		"date":before_date,
+	})
+
+
+	os.Remove("统计.xlsx")	//写excle
+	f :=xlsx.NewFile()
+	sheet, _ := f.AddSheet("统计")
+
+	//企业数据
+	row := sheet.AddRow()
+	row.AddCell().Value = "企业分类/日期"
+	row.AddCell().Value = "数量"
+	for _,tmp := range qy_arr {
+		row = sheet.AddRow()
+		row.AddCell().SetString(tmp["date"])
+		row.AddCell().SetString(tmp["num"])
+	}
+
+
+	//站点相关数据
+	sheet.AddRow()
+	row = sheet.AddRow()
+	row.AddCell().Value = "站点分类/日期"
+	row.AddCell().SetString(qu.ObjToString(st_data["date"]))
+	row.AddCell().SetString(qu.ObjToString(before_st_data["date"]))
+
+
+	data_1 := *qu.ObjToMap(st_data["data"])
+	data_2 := *qu.ObjToMap(before_st_data["data"])
+
+	for _,key := range infoSiteArr {
+		row = sheet.AddRow()
+		row.AddCell().SetString(key)
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_1[key])))
+		row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(data_2[key])))
+	}
+	row = sheet.AddRow()
+	row.AddCell().Value = "总计"
+	row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(st_data["num"])))
+	row.AddCell().SetString(fmt.Sprintf("%d",qu.Int64All(before_st_data["num"])))
+
+	err := f.Save(xlsx_name)
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+
+	sendErrMailSmtp("统计-站点-企业","附件")
 
+	log.Println("定时处理完毕...",int(time.Now().Unix())-start,"秒")
+}

+ 40 - 0
data_monitoring/listen_task/src/mark

@@ -0,0 +1,40 @@
+{
+  "save_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "editor",
+    "coll": "monitor_other",
+    "pool": 5
+  },
+  "qy_mgodb": {
+    "addr": "172.17.4.87:27080",
+    "db": "spider",
+    "coll": "baidu_enterprise",
+    "pool": 5
+  },
+  "jp_mgodb": {
+    "addr": "172.17.145.179:28082",
+    "db": "spider_data",
+    "coll": "data_bak",
+    "pool": 5
+  },
+  "st_mgodb": {
+    "addr": "172.17.4.85:27080",
+    "db": "qfw",
+    "coll": "result_20210109",
+    "pool": 5
+  },
+  "es_index_st": "bidding",
+  "es_type_st": "bidding",
+  "save_other_name": "monitor_other",
+  "save_site_name" : "monitor_site",
+  "smtpMail": {
+    "from": "zhengkun@topnet.net.cn",
+    "to": "zhengkun@topnet.net.cn",
+    "cc": "zhengkun@topnet.net.cn",
+    "smtpHost": "smtp.qq.com",
+    "smtpPort": "465",
+    "user":     "920032221@qq.com",
+    "pwd":      "xomkphsjsamybdbj"
+  },
+  "xlsx_name" : "统计.xlsx"
+}

+ 31 - 18
data_monitoring/listen_task/src/sendmail.go

@@ -3,28 +3,41 @@ package main
 import (
 	"fmt"
 	"io/ioutil"
-	"log"
-	"net/http"
+	"os"
+	qu "qfw/util"
+	"qfw/util/mail"
 )
 
+var from,to,cc, smtpHost,user,pwd string
+var smtpPort int
 
-var tomail string
-var api string
 
-//api模式
-func sendErrMailApi(title,body string)  {
-	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
-	if jkmail != nil {
-		tomail, _ = jkmail["to"].(string)
-		api, _ = jkmail["api"].(string)
+func sendErrMailSmtp(title,body string) {
+
+
+	smtpMail, _ := sysconfig["smtpMail"].(map[string]interface{})
+
+	if smtpMail != nil {
+		from, _ = smtpMail["from"].(string)
+		to, _ = smtpMail["to"].(string)
+		cc, _ = smtpMail["cc"].(string)
+		smtpHost, _ = smtpMail["smtpHost"].(string)
+		smtpPort= qu.IntAll(smtpMail["smtpPort"])
+		user, _ = smtpMail["user"].(string)
+		pwd, _ = smtpMail["pwd"].(string)
 	}
-	log.Println(tomail,api)
-	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
-	if err == nil {
-		defer res.Body.Close()
-		read, err := ioutil.ReadAll(res.Body)
-		log.Println("邮件发送成功:", string(read), err)
-	}else {
-		log.Println("邮件发送失败:", err)
+	f, _ := os.Open(xlsx_name)
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		fmt.Println("err:",err)
+		return
 	}
+
+	ok := mail.GSendMail_B(user, from, cc, from, title, body, f.Name(), b, &mail.GmailAuth{
+		SmtpHost: smtpHost,
+		SmtpPort: smtpPort,
+		User:     user,
+		Pwd:      pwd,
+	})
+	fmt.Println(ok)
 }

+ 5 - 3
data_monitoring/vps_client/src/config.json

@@ -1,7 +1,9 @@
 {
-  "vpsID": "id标识1111",
+  "vpsID": "专用-常州",
   "processArr": [
     "d1.exe",
-    "d2.exe"
-  ]
+    "d2.exe",
+    "d3.exe"
+  ],
+  "during":"10"
 }

+ 42 - 55
data_monitoring/vps_client/src/main.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"fmt"
 	"github.com/cron"
 	"log"
 	"net/http"
@@ -16,37 +17,37 @@ import (
 
 var (
 	sysconfig			map[string]interface{} //配置文件
-	vpsID	string			//机器唯一标识
-	processArr []string		//机器相关下载器
+	vpsID,during		string			//机器唯一标识
+	processArr 			[]string		//机器相关下载器
 
 )
 
 func init()  {
-	log.Println("加载...")
 	qu.ReadConfig(&sysconfig)
 	vpsID = qu.ObjToString(sysconfig["vpsID"])
+	during = qu.ObjToString(sysconfig["during"])
 	processArr = qu.ObjArrToStringArr(sysconfig["processArr"].([]interface{}))
 }
 
 
 func main() {
-	//临时测试-
-	if vpsID=="" || len(processArr)< 1 {
+
+	if vpsID == "" || len(processArr) < 1 {
 		log.Println("配置文件异常,请检查......")
 		os.Exit(1)
 	}
 
-
 	//定时器
 	c := cron.New()
-	//c.AddFunc("0 */5 * * * ?", func() { task() })
-	c.AddFunc("*/10 * * * * ?", func() { task() })
+	spec := fmt.Sprintf("0 */%s * * * ?",during)	//分
+	//spec =fmt.Sprintf("*/%s * * * * ?",during)		//秒
+	c.AddFunc(spec, func() { task() })
 	c.Start()
 	time.Sleep(99999 * time.Hour)
 }
 
 func task()  {
-
+	log.Println("执行...检测...")
 	//先检测下载器
 	process := "0" //正常 - windows模式
 	for _,v:=range processArr {
@@ -56,7 +57,7 @@ func task()  {
 				process = "1"
 				break
 			}
-		}else {
+		}else { //linux测试使用
 			b,_:=checkProRunning(v)
 			if !b {
 				process = "1"
@@ -64,18 +65,21 @@ func task()  {
 			}
 		}
 	}
-	log.Println("当前下载器:",process)
-	u, _ := url.Parse("http://127.0.0.1:7811") //本地测试
-	//u, _ := url.Parse("http://monitor.spdata.jianyu360.com") //线上
+	//log.Println("当前下载器:",process)
+	//u, _ := url.Parse("http://127.0.0.1:7811") //本地测试
+	u, _ := url.Parse("http://monitor.spdata.jianyu360.com") //线上
 	q := u.Query()
 	q.Set("id", vpsID)
 	q.Set("process", process)
 	u.RawQuery = q.Encode()
 
+	//log.Println(vpsID,process)
+
 	_, err := http.Get(u.String());
 	if err != nil {
 		log.Println("异常",err)
 	}
+
 }
 
 
@@ -89,16 +93,29 @@ func task()  {
 
 
 
-
-
-
-
-
-
-
-
-
-
+//根据进程名判断进程是否运行
+func checkProRunning(serverName string) (bool, error) {
+	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
+	pid, err := runInLinux(cmd)
+	if err != nil {
+		return false, err
+	}
+	return pid != "", nil
+}
+//根据进程名称获取进程ID
+func getPid(serverName string) (string, error) {
+	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
+	pid, err := runInLinux(cmd)
+	return pid , err
+}
+//
+func runInLinux(cmd string) (string, error) {
+	result, err := exec.Command("/bin/sh", "-c", cmd).Output()
+	if err != nil {
+		return "", err
+	}
+	return strings.TrimSpace(string(result)), err
+}
 
 func isProcessExist(appName string) (bool, string, int) {
 	appary := make(map[string]int)
@@ -107,7 +124,7 @@ func isProcessExist(appName string) (bool, string, int) {
 	//fmt.Printf("fields: %v\n", output)
 	n := strings.Index(string(output), "System")
 	if n == -1 {
-		//fmt.Println("no find")
+		//log.Println("no find")
 		//os.Exit(1)
 	}
 	data := string(output)[n:]
@@ -119,35 +136,5 @@ func isProcessExist(appName string) (bool, string, int) {
 			return true, appName, appary[appName]
 		}
 	}
-
 	return false, appName, -1
-}
-
-
-
-
-
-
-//根据进程名判断进程是否运行
-func checkProRunning(serverName string) (bool, error) {
-	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
-	pid, err := runInLinux(cmd)
-	if err != nil {
-		return false, err
-	}
-	return pid != "", nil
-}
-//执行linux进程信息
-func runInLinux(cmd string) (string, error) {
-	result, err := exec.Command("/bin/sh", "-c", cmd).Output()
-	if err != nil {
-		return "", err
-	}
-	return strings.TrimSpace(string(result)), err
-}
-//根据进程名称获取进程ID
-func getPid(serverName string) (string, error) {
-	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`
-	pid, err := runInLinux(cmd)
-	return pid , err
-}
+}

+ 9 - 3
data_monitoring/vps_server/src/config.json

@@ -1,10 +1,16 @@
 {
   "port": "7811",
+  "save_mgodb": {
+    "addr": "192.168.3.207:27092",
+    "db": "zhengkun",
+    "coll": "monitor_other",
+    "pool": 5
+  },
   "vpsIDs" : [
-    "id标识1111"
+    "专用-常州"
   ],
   "during": 10,
-  "isErr" : 5,
+  "isErr" : 3,
   "smtpMail": {
     "from": "zhengkun@topnet.net.cn",
     "to": "zhengkun@topnet.net.cn",
@@ -14,7 +20,7 @@
     "pwd":      "xomkphsjsamybdbj"
   },
   "jkmail": {
-    "to": "zhengkun@topnet.net.cn",
+    "to": "zhaoyujian@topnet.net.cn",
     "api": "http://172.17.145.179:19281/_send/_mail"
   }
 }

+ 61 - 13
data_monitoring/vps_server/src/main.go

@@ -7,14 +7,19 @@ import (
 	"net/http"
 	qu "qfw/util"
 	"strings"
+	"sync"
 	"time"
 )
 var (
 	sysconfig			map[string]interface{} //配置文件
 	port				string
+	save_mgo        	*MongodbSim
 	idsArr				[]string
 	dataTmp				map[string]map[string]interface{}
 	during,isErr		int64
+	test				map[string]interface{}
+	updatelock 			sync.Mutex
+	save_coll_name		string
 )
 func init()  {
 	//加载配置文件
@@ -31,14 +36,28 @@ func init()  {
 			"isErrNum":0,
 			"isProcess" : 0,
 			"isVpsMail":0,
-			"isPrpMail":0,
+			"isProMail":0,
 		}
 	}
+
 	during = qu.Int64All(sysconfig["during"])
 	isErr = qu.Int64All(sysconfig["isErr"])
+
+	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
+	save_coll_name = qu.ObjToString(saveconf["coll"])
+	save_mgo = &MongodbSim{
+		MongodbAddr: saveconf["addr"].(string),
+		DbName:      saveconf["db"].(string),
+		Size:        qu.IntAllDef(saveconf["pool"], 5),
+	}
+	save_mgo.InitPool()
+
+
 	log.Println("准备完毕...")
 }
 
+
+
 func main() {
 
 	//http://monitor.spdata.jianyu360.com/,程序端口7811
@@ -48,8 +67,8 @@ func main() {
 
 	//每隔1分钟执行一次:0 */1 * * * ?   每隔5秒执行一次:*/5 * * * * ?
 
-	//spec :=fmt.Sprintf("0 */%d * * * ?",during)
-	spec :=fmt.Sprintf("*/%d * * * * ?",during)
+	spec :=fmt.Sprintf("30 */%d * * * ?",during)
+	//spec =fmt.Sprintf("*/%d * * * * ?",during)
 	c := cron.New()
 	c.AddFunc(spec, func() { taskFinishing()})
 	c.Start()
@@ -57,17 +76,20 @@ func main() {
 }
 
 func handler(w http.ResponseWriter, r *http.Request) {
+	updatelock.Lock()
 	r.ParseForm() //解析参数,默认是不会解析的
 	if r.Method == "GET" {
 		vpsid ,process,isProMail:= "",int64(0),int64(0)
 		for k, v := range r.Form {
 			if k=="id" {
 				vpsid = strings.Join(v, "")
+				isProMail = qu.Int64All(dataTmp[vpsid]["isProMail"])
 			}else if k=="process" {
 				process = qu.Int64All(strings.Join(v, ""))
 				if process==0 {
 					isProMail = 0
 				}
+
 			}else {
 
 			}
@@ -76,27 +98,33 @@ func handler(w http.ResponseWriter, r *http.Request) {
 			dataTmp[vpsid] = map[string]interface{}{
 				"isHeart":1,
 				"isErrNum":0,
-				"isVpsMail":0,   //收到心跳-vps邮件置为0,可以发
+				"isVpsMail":0,   //收到心跳-vps邮件置为0,可以发
 				"isProcess":process,
 				"isProMail":isProMail,
 			}
 		}
-		log.Println("接收Get请求:",dataTmp[vpsid])
+
+		//log.Println("接收Get请求:",dataTmp[vpsid])
 
 	} else if r.Method == "POST" {
 
 	} else {
 
 	}
+
+	updatelock.Unlock()
 }
 
 //不断监听处理
 func taskFinishing()  {
-	log.Println("执行...处理一次...")
+	//加锁
+	updatelock.Lock()
+	log.Println("...处理一次...")
+
 	isVpsMailContent,isProMailContent:= "",""
 	for _ , vpsid := range idsArr {
 		//此标识-是否正常
-		log.Println("原:",dataTmp[vpsid])
+		//log.Println("原:",dataTmp[vpsid])
 		isHeart,isProcess:= qu.Int64All(dataTmp[vpsid]["isHeart"]),qu.Int64All(dataTmp[vpsid]["isProcess"])
 		isErrNum := int64(0)
 		isVpsMail,isProMail := qu.Int64All(dataTmp[vpsid]["isVpsMail"]),qu.Int64All(dataTmp[vpsid]["isProMail"])
@@ -128,7 +156,7 @@ func taskFinishing()  {
 			isProcess = 0
 		}
 
-		log.Println("处理后:","心跳:",0,"次数:",isErrNum,"下载器:",isProcess,"vps邮件:",isVpsMail,"pro邮件:",isProMail)
+		//log.Println("处理后:","心跳:",0,"次数:",isErrNum,"下载器:",isProcess,"vps邮件:",isVpsMail,"pro邮件:",isProMail)
 
 
 		dataTmp[vpsid] = map[string]interface{}{
@@ -138,17 +166,37 @@ func taskFinishing()  {
 			"isVpsMail":isVpsMail,
 			"isProMail" : isProMail,
 		}
-		//log.Println("处理:",dataTmp[id])
+
 	}
 
+	//log.Println("处理后",isProMailContent)
 
 	if isVpsMailContent!=""{
-		log.Println("发邮件vps异常...",isVpsMailContent)
-		//sendErrMailApi("vpn异常",isVpsMailContent)
+		log.Println("发邮件:vps异常...",isVpsMailContent)
+		comeintime :=qu.Int64All(time.Now().Unix())//日志记录
+		save_mgo.Save(save_coll_name, map[string]interface{}{
+			"name":"vps",
+			"comeintime":comeintime,
+			"date":qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT),
+			"detail" : isVpsMailContent,
+		})
+
+		sendErrMailApi("vps",isVpsMailContent)
 	}else {
 		if isProMailContent !="" {
-			log.Println("发邮件下载器异常...",isVpsMailContent)
-			//sendErrMailApi("下载器异常",isProMailContent)
+			log.Println("发邮件:下载器异常...",isProMailContent)
+			comeintime :=qu.Int64All(time.Now().Unix())//日志记录
+			save_mgo.Save(save_coll_name, map[string]interface{}{
+				"name":"下载器",
+				"comeintime":comeintime,
+				"date":qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT),
+				"detail" : isProMailContent,
+			})
+
+			sendErrMailApi("下载器异常",isProMailContent)
 		}
 	}
+
+	updatelock.Unlock()
+
 }

+ 49 - 0
data_monitoring/vps_server/src/mark

@@ -0,0 +1,49 @@
+"vpsIDs" : [
+    "专用-常州",
+    "专用-杭州",
+    "专用-合肥",
+    "专用-莱芜",
+    "专用-南通",
+    "专用-秦皇岛",
+    "专用-威海",
+    "专用-驻马店",
+    "专用-池州",
+    "专用-菏泽",
+    "专用-淮南",
+    "专用-嘉兴",
+    "专用-宣城",
+    "数据-桂林",
+    "数据-黄山",
+    "数据-荆州",
+    "数据-聊城",
+    "数据-洛阳",
+    "数据-宁德",
+    "数据-衢州",
+    "数据-三亚",
+    "数据-厦门",
+    "数据-汕头",
+    "数据-信阳",
+    "数据-宿迁",
+    "数据-盐城",
+    "数据-中山",
+    "数据-镇江",
+    "数据-珠海",
+    "数据-成都",
+    "数据-莆田",
+    "数据-湖州",
+    "数据-苏州",
+    "数据-台州",
+    "数据-舟山",
+    "附件-亳州",
+    "附件-葫芦岛",
+    "附件-马鞍山",
+    "附件-濮阳",
+    "附件-绍兴",
+    "附件-石家庄",
+    "附件-云浮",
+    "代理-北京",
+    "代理-杭州",
+    "代理-济南",
+    "代理-开封",
+    "代理-扬州"
+  ],

+ 81 - 0
data_monitoring/words_vaild/src/main.go

@@ -0,0 +1,81 @@
+package main
+
+import (
+	"log"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"strings"
+	"unicode/utf8"
+)
+
+func init()  {
+	elastic.InitElasticSize("http://192.168.3.11:9800",10)
+}
+
+func main()  {
+
+	defer qu.Catch()
+	log.Println("处理 ... 指定企业名称 ...")
+	/*
+	云南和合泰商贸有限公司
+	安徽省微乡华艺环境工程有限公司
+	*/
+	new_name,b :=dealWithScoreRules("安徽省微乡华艺环境工程有限公司")
+	if b {
+		log.Println("最终",new_name)
+	}
+
+}
+
+
+func dealWithScoreRules(name string) (string,bool) {
+	new_name,isok :="",false
+	query:= `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name_2","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"facets":{}}`
+	//默认取最高分-分析多个分-遍历器查询
+	data := *elastic.Get("azktest","azktest",query)
+	if len(data)>0 && data != nil {
+		new_name = qu.ObjToString(data[0]["name"])
+	}
+	if new_name!="" { //分析hit比例
+		total,hit := dealWithWordsRules(name,new_name)
+		if float64(hit)/float64(total)>=0.8 {
+			isok = true
+		}
+	}
+	return new_name,isok
+}
+
+//击中数量以及比例
+func dealWithWordsRules(info_name string ,source_name string) (int,int){
+	total,hit :=0,0
+	nameArr,_ := calculateWordCount(info_name)
+	_,total = calculateWordCount(source_name)
+	for _,v1 := range nameArr {
+		if strings.Contains(source_name,v1) {
+			hit++
+		}
+	}
+	return total,hit
+}
+
+//分词结果
+func calculateWordCount(name string) ([]string,int) {
+
+	arr ,space:= make([]string,0),2
+	total := utf8.RuneCountInString(name)-(space-1)
+	if name == "" || total<=0  {
+		return arr,0
+	}
+	nameRune := []rune(name)
+	for i:=0;i<total ;i++  {
+		new_str := string(nameRune[i:space+i])
+		arr = append(arr,new_str)
+	}
+	return arr,len(arr)
+}
+
+
+
+
+
+

+ 304 - 0
data_monitoring/words_vaild/src1/main.go

@@ -0,0 +1,304 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/tealeg/xlsx"
+	"log"
+	"os"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"strings"
+	"sync"
+	"unicode/utf8"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+
+)
+var (
+	sysconfig			map[string]interface{} //配置文件
+	save_mgo        	*MongodbSim
+)
+
+func init()  {
+	save_mgo = &MongodbSim{
+		MongodbAddr: "192.168.3.207:27092",
+		DbName:      "zhengkun",
+		Size:        5,
+	}
+	save_mgo.InitPool()
+
+	elastic.InitElasticSize("http://192.168.3.11:9800",20)
+}
+
+func dealWithDataXlsx()  {
+
+	q := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	it := sess.DB(save_mgo.DbName).C("zk_test_words").Find(&q).Iter()
+	total:=0
+	saveArr := make([]map[string]string,0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total % 10000 == 0 {
+			log.Println("current index",total,tmp["_id"])
+		}
+
+		if total % 30 ==0 {
+			name:=qu.ObjToString(tmp["name"])
+			dict := make(map[string]string)
+			dict["name"] = name
+			for i:=0; i<5;i++ {
+				value,total,hit :="","",""
+				key := "word_"+fmt.Sprintf("%d",i)
+				if tmp[key]!=nil {
+
+					if arr,ok := tmp[key].(primitive.A);ok {
+						dataArr :=qu.ObjArrToMapArr(arr)
+						value =qu.ObjToString(dataArr[0]["name"])
+						if i!=0 {
+							total = fmt.Sprintf("%d",dataArr[0]["all_words"])
+							hit = fmt.Sprintf("%d",dataArr[0]["hit_words"])
+						}
+					}
+
+				}
+				key1,key2:="total"+fmt.Sprintf("%d",i),"hit"+fmt.Sprintf("%d",i)
+				dict[key] = value
+				dict[key1] = total
+				dict[key2] = hit
+
+			}
+			saveArr= append(saveArr,dict)
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+	os.Remove("words.xlsx")	//写excle
+	f :=xlsx.NewFile()
+
+
+	for i:=0; i<5;i++ {
+		key := "word_"+fmt.Sprintf("%d",i)
+		sheet, _ := f.AddSheet("统计"+key)
+		row := sheet.AddRow()
+		row.AddCell().Value = "name"
+		row.AddCell().Value = key
+		if i!=0 {
+			row.AddCell().Value = "total"
+			row.AddCell().Value = "hit"
+		}
+		key1,key2:="total"+fmt.Sprintf("%d",i),"hit"+fmt.Sprintf("%d",i)
+
+		for _,tmp := range saveArr {
+			row = sheet.AddRow()
+			row.AddCell().SetString(tmp["name"])
+			row.AddCell().SetString(tmp[key])
+			row.AddCell().SetString(fmt.Sprintf("%s",tmp[key1]))
+			row.AddCell().SetString(fmt.Sprintf("%s",tmp[key2]))
+		}
+	}
+
+	err := f.Save("words.xlsx")
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+}
+
+func main()  {
+
+	//导出xlsx
+	dealWithDataXlsx()
+	return
+
+
+
+
+	defer qu.Catch()
+	log.Println("处理 ... 指定企业名称 ...")
+
+	//分析错误数据
+	//
+	q := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	//细节才需要遍历
+	it := sess.DB(save_mgo.DbName).C("zk_company_test").Find(&q).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total % 10000 == 0 {
+			log.Println("current index",total,tmp["_id"])
+		}
+
+		name:=qu.ObjToString(tmp["name"])
+		save_dict := make(map[string]interface{},0)
+		for i:=0; i<5;i++ {
+			key := "word_"+fmt.Sprintf("%d",i)
+			dataArr :=dealWithScoreRules(name,i)
+			if dataArr ==nil || len(dataArr)<1 {
+				//无数据
+			}else {
+				save_dict[key] = dealWithWordsRules(name,dataArr,i)
+			}
+
+		}
+
+		if len(save_dict)>0 {
+			save_dict["name"]  = name
+			save_mgo.Save("zk_test_words",save_dict)
+		}
+
+		tmp = make(map[string]interface{})
+	}
+
+}
+
+//分数维度
+func dealWithScoreRules(name string,space int) []map[string]interface{} {
+	key := ""
+	if space>0&&space<5{
+		key = fmt.Sprintf("%d",space)
+	}
+	query:= `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name_`+key+`","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":3,"sort":[],"facets":{}}`
+
+	if key=="" {
+		query = `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":3,"sort":[],"facets":{}}`
+	}
+	client := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(client)
+	searchResult, err := client.Search().Index("azktest").Type("azktest").Source(query).Do()
+	if err != nil {
+		log.Println("从ES查询出错", err.Error())
+		return nil
+	}
+	resNum := len(searchResult.Hits.Hits)
+	res := make([]map[string]interface{}, resNum)
+	if searchResult.Hits != nil {
+		if resNum < 5000 {
+			for i, hit := range searchResult.Hits.Hits {
+				data := make(map[string]interface{},0)
+				json.Unmarshal(*hit.Source, &data)
+				res[i] = map[string]interface{}{
+					"name":data["name"],
+					"score":*hit.Score,
+				}
+			}
+		} else {
+			log.Println("查询结果太多,查询到:", resNum, "条")
+		}
+
+	}
+	return res
+}
+
+
+
+//击中数量以及比例
+func dealWithWordsRules(name string ,source []map[string]interface{},space int) []map[string]interface{} {
+
+	nameArr,_ := calculateWordCount(name,space)
+	newArr := make([]map[string]interface{},0)
+	for _,v := range source {
+		total,hit :=0,0
+		source_name :=qu.ObjToString(v["name"])
+		_,total = calculateWordCount(source_name,space)
+		for _,v1 := range nameArr {
+			if strings.Contains(source_name,v1) {
+				hit++
+			}
+		}
+
+
+		if space==0 {
+			newArr = append(newArr, map[string]interface{}{
+				"name":source_name,
+				"score":qu.Float64All(v["score"]),
+			})
+		}else {
+			newArr = append(newArr, map[string]interface{}{
+				"name":source_name,
+				"score":qu.Float64All(v["score"]),
+				"all_words" : total,
+				"hit_words" : hit,
+			})
+		}
+	}
+	return newArr
+}
+
+//分词结果
+func calculateWordCount(name string,space int) ([]string,int) {
+	arr := make([]string,0)
+	total := utf8.RuneCountInString(name)-(space-1)
+	if name == "" || space<=0 || total<=0  {
+		return arr,0
+	}
+	nameRune := []rune(name)
+	for i:=0;i<total ;i++  {
+		new_str := string(nameRune[i:space+i])
+		arr = append(arr,new_str)
+	}
+	return arr,len(arr)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+func readyDataEs()  {
+
+	q := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	//多线程升索引
+	pool_es := make(chan bool, 10)
+	wg_es := &sync.WaitGroup{}
+	//细节才需要遍历
+	it := sess.DB(save_mgo.DbName).C("zk_company_name").Find(&q).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total % 10000 == 0 {
+			log.Println("current index",total,tmp["_id"])
+		}
+		savetmp := make(map[string]interface{}, 0)
+		savetmp["_id"] = tmp["_id"]
+		savetmp["name"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_1"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_2"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_3"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_4"] = qu.ObjToString(tmp["company_name"])
+		pool_es <- true
+		wg_es.Add(1)
+		go func(savetmp map[string]interface{}) {
+			defer func() {
+				<-pool_es
+				wg_es.Done()
+			}()
+			elastic.Save("azktest","azktest", savetmp)
+		}(savetmp)
+		tmp = make(map[string]interface{})
+	}
+	wg_es.Wait()
+
+
+	log.Println("is over",total)
+}
+

+ 328 - 0
data_monitoring/words_vaild/src1/mgo.go

@@ -0,0 +1,328 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 1 - 2
fullproject/src_v1/load_data.go

@@ -172,7 +172,7 @@ func (p *ProjectTask) loadSpiderCode() {
 	defer MgoSpider.DestoryMongoConn(sess)
 	q := map[string]interface{}{}
 	field := map[string]interface{}{"code": 1, "isflow": 1}
-	it := sess.DB(MgoSpider.DbName).C("luaconfig_back").Find(&q).Select(field).Iter()
+	it := sess.DB(MgoSpider.DbName).C("luaconfig").Find(&q).Select(field).Iter()
 	n := 0
 	pool := make(chan map[string]interface{}, 100)
 	over := make(chan bool)
@@ -192,7 +192,6 @@ func (p *ProjectTask) loadSpiderCode() {
 		result := make(map[string]interface{})
 		if it.Next(&result) {
 			go func(res map[string]interface{}) {
-				util.Debug(result)
 				pool <- result
 			}(result)
 		} else {

+ 2 - 2
fullproject/src_v1/main.go

@@ -77,14 +77,14 @@ func main() {
 	//udp强制合并  信息id1,id2,id3 [项目id] 不存在时新建  qzhb
 	//udp强制拆分  项目id,信息id1,id2          qzcf
 	//udp重新合并  信息id1,id2,id3             cxhb
+	P_QL.loadSpiderCode()
+	P_QL.loadSite()
 	if Sysconfig["loadStart"] != nil {
 		loadStart := util.Int64All(Sysconfig["loadStart"])
 		if loadStart > -1 {
 			P_QL.loadData(loadStart)
 		}
 	}
-	P_QL.loadSpiderCode()
-	P_QL.loadSite()
 	go checkMapJob()
 	time.Sleep(99999 * time.Hour)
 }

+ 19 - 10
fullproject/src_v1/project.go

@@ -236,10 +236,10 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 	}
 
 	if !bFindProject {
-		if !IsCreatePro(info) {
-			qu.Debug("舍弃数据---", info.Id)
-			return
-		}
+		//if !IsCreatePro(info) {
+		//	qu.Debug("舍弃数据---", info.Id)
+		//	return
+		//}
 		id, p1 := p.NewProject(tmp, info)
 		p.AllIdsMapLock.Lock()
 		p.AllIdsMap[id] = &ID{Id: id, P: p1}
@@ -790,19 +790,20 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	} else if thisinfo.TopType == "结果" {
 		if thisinfo.SubType == "中标" || thisinfo.SubType == "成交" || thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
 			if pInfo.Jgtime > 0 {
-				jg1 := int64(math.Abs(float64(pInfo.Jgtime - thisinfo.Publishtime)))
+				//jg1 := int64(math.Abs(float64(pInfo.Jgtime - thisinfo.Publishtime)))
+				// 4.12	直接更新jgtime
 				//公告状态和项目状态同样都是中标或者成交,
 				if (thisinfo.SubType == "中标" || thisinfo.SubType == "成交") && (pInfo.Bidstatus == "中标" || pInfo.Bidstatus == "成交") {
-					if jg1 > p.jgTime {
+					//if jg1 > p.jgTime {
 						set["jgtime"] = tmp["publishtime"]
 						pInfo.Jgtime = thisinfo.Publishtime
-					}
+					//}
 					//公告状态和项目状态同样是流标或者废标
 				} else if (thisinfo.SubType == "流标" || thisinfo.SubType == "废标") && (pInfo.Bidstatus == "流标" || pInfo.Bidstatus == "废标") {
-					if jg1 > p.jgTime {
+					//if jg1 > p.jgTime {
 						set["jgtime"] = tmp["publishtime"]
 						pInfo.Jgtime = thisinfo.Publishtime
-					}
+					//}
 				}
 			} else {
 				set["jgtime"] = tmp["publishtime"]
@@ -930,7 +931,11 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["buyertel"] = ""
 	}
 	if thisinfo.ContractCode != "" {
-		set["contractcode"] = pInfo.ContractCode + "," + thisinfo.ContractCode
+		if pInfo.ContractCode == "" {
+			set["contractcode"] = thisinfo.ContractCode
+		}else {
+			set["contractcode"] = pInfo.ContractCode + "," + thisinfo.ContractCode
+		}
 	}
 
 	//8--代理机构
@@ -1531,3 +1536,7 @@ func (p *ProjectTask) GetBidTypeAndBidStatus(info *Info) (string, string) {
 	return typeStr, statusStr
 
 }
+
+func ListSort(tmp map[string]interface{}) {
+
+}

+ 26 - 22
fullproject/src_v1/task.go

@@ -124,7 +124,7 @@ func NewPT() *ProjectTask {
 		coll:       ProjectColl,
 		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
 		statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
-		jgTime:		int64(util.IntAllDef(3, 3) * 86400),
+		jgTime:		int64(util.IntAllDef(7, 7) * 86400),
 	}
 	return p
 }
@@ -470,24 +470,11 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 					defer func() {
 						<-pool
 					}()
-					if util.IntAll(tmp["repeat"]) == 0 {
-						if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
-							//增量	dataging为1不参与合并
-							util.Debug("增量   dataging == 1 ", tmp["_id"])
-							return
-						}
-
-						p.fillInPlace(tmp)
-						info := ParseInfo(tmp)
-						p.currentTime = info.Publishtime
-						//普通合并
-						p.CommonMerge(tmp, info)
-					} else {
-						//信息错误,进行更新
-						p.mapBidLock.Lock()
-						countRepeat++
-						p.mapBidLock.Unlock()
-					}
+					p.fillInPlace(tmp)
+					info := ParseInfo(tmp)
+					p.currentTime = info.Publishtime
+					//普通合并
+					p.CommonMerge(tmp, info)
 				}(tmp)
 			case <-over:
 				break L
@@ -520,10 +507,27 @@ L:
 			tmp := make(map[string]interface{})
 			if query.Next(&tmp) {
 				lastid = tmp["_id"]
-				if count%10 == 0 {
-					log.Println("current", count, lastid)
+				if P_QL.currentType == "ql" {
+					if count%20000 == 0 {
+						log.Println("current", count, lastid)
+					}
+				}else {
+					if count%1000 == 0 {
+						log.Println("current", count, lastid)
+					}
+				}
+				if util.IntAll(tmp["repeat"]) == 0 {
+					if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
+						infoPool <- tmp
+					}else {
+						util.Debug("增量   dataging == 1 ", tmp["_id"])
+					}
+				}else {
+					countRepeat++
+					if P_QL.currentType == "project" {
+						util.Debug("repeat err---", tmp["_id"])
+					}
 				}
-				infoPool <- tmp
 				count++
 			} else {
 				break L

+ 9 - 9
monitor/task.go

@@ -87,15 +87,15 @@ func EsCheck(result string) string {
 	util.Debug(*resp)
 	if resp.Status != "green" {
 		result += "<br>" + "检索库异常,异常内容:" + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "cluster_name:" + resp.ClusterName + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "status:" + resp.Status + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
+			"cluster_name:" + resp.ClusterName + "<br>" +
+			"status:" + resp.Status + "<br>" +
+			"number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
+			"number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
+			"number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
+			"active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
+			"relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
+			"initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
+			"unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
 	}
 	return result
 }

+ 2 - 2
qyxy/src/config.json

@@ -1,7 +1,7 @@
 {
 	"mgodb": "192.168.3.207:27092",
 	"dbsize": 12,
-	"dbname": "mixdata",
+	"dbname": "wjh",
 	"dbcoll": "qyxy",
 	"savecoll": "qyxy_std",
     "uname": "dataAnyWrite",
@@ -10,7 +10,7 @@
 	"updatetime": 0,
 	"elastic": {
         "addr": "http://192.168.3.128:9800",
-        "index": "qyxy_v2",
+        "index": "qyxy_v1",
         "itype": "qyxy",
         "otherindex": "qyxy_all",
         "otheritype": "qyxy",

+ 4 - 4
qyxy/src/main.go

@@ -42,8 +42,8 @@ func init() {
 		MongodbAddr: Sysconfig["mgodb"].(string),
 		Size:        qu.IntAllDef(Sysconfig["dbsize"], 5),
 		DbName:      Dbname,
-		UserName: 	 Sysconfig["uname"].(string),
-		Password: 	 Sysconfig["upwd"].(string),
+		//UserName: 	 Sysconfig["uname"].(string),
+		//Password: 	 Sysconfig["upwd"].(string),
 	}
 	Mgo.InitPool()
 	//es
@@ -71,9 +71,9 @@ func init() {
 }
 
 func main() {
-	go TimeTask()
+	//go TimeTask()
 	//QyxyStandard()
-	//HistoryQyxyStandard()
+	HistoryQyxyStandard()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 33 - 29
qyxy/src/task.go

@@ -397,23 +397,24 @@ func QyxyStandard() bool {
 			company_name := qu.ObjToString(esMap["company_name"])
 			if company_type == "个体工商户" {
 				if len([]rune(company_name)) >= 5 {
-					esMap["company_type_int"] = 5
+					esMap["company_type_int"] = 31
 				}else {
-					esMap["company_type_int"] = 6
+					esMap["company_type_int"] = 32
 				}
 			}else if company_type == "其他" || company_type == "" {
 				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 3
+					esMap["company_type_int"] = 21
 				}else {
-					esMap["company_type_int"] = 4
+					esMap["company_type_int"] = 22
 				}
 			}else {
-				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 1
+				if company_type == "内资分公司" {
+					esMap["company_type_int"] = 12
+				}else if len([]rune(company_name)) >= 4 {
+					esMap["company_type_int"] = 11
 				}else {
-					esMap["company_type_int"] = 2
+					esMap["company_type_int"] = 13
 				}
-
 			}
 			lock.Lock()
 			if EsSaveFlag {
@@ -466,8 +467,8 @@ func HistoryQyxyStandard() {
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{} //控制读写
 	arr := [][]map[string]interface{}{}
-	//count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
-	//log.Println("共查询:", count, "条")
+	count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
+	log.Println("共查询:", count, "条")
 	it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
 	sum := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
@@ -754,30 +755,32 @@ func HistoryQyxyStandard() {
 				mgoMap[k] = v
 			}
 			//es数据过滤
-			EsSaveFlag := true
+			//EsSaveFlag := true
 			company_type := qu.ObjToString(esMap["company_type"])
 			company_name := qu.ObjToString(esMap["company_name"])
 			if company_type == "个体工商户" {
 				if len([]rune(company_name)) >= 5 {
-					esMap["company_type_int"] = 5
+					esMap["company_type_int"] = 31
 				}else {
-					esMap["company_type_int"] = 6
+					esMap["company_type_int"] = 32
 				}
 			}else if company_type == "其他" || company_type == "" {
 				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 3
+					esMap["company_type_int"] = 21
 				}else {
-					esMap["company_type_int"] = 4
+					esMap["company_type_int"] = 22
 				}
 			}else {
-				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 1
+				if company_type == "内资分公司" {
+					esMap["company_type_int"] = 12
+				}else if len([]rune(company_name)) >= 4 {
+					esMap["company_type_int"] = 11
 				}else {
-					esMap["company_type_int"] = 2
+					esMap["company_type_int"] = 13
 				}
 			}
 			lock.Lock()
-			if EsSaveFlag {
+			//if EsSaveFlag {
 				if esMap["history_name"] != nil {
 					var nameArr []string
 					for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
@@ -790,18 +793,18 @@ func HistoryQyxyStandard() {
 					}
 				}
 				EsSaveCache <- esMap //过滤后数据保存
-			}
+			//}
 			//EsSaveAllCache <- esMap //所有数据保存
 			//SaveHistoryName(tmp)
 			//update = append(update, map[string]interface{}{"$set": mgoMap})
-			if len(update) == 2 {
-				arr = append(arr, update)
-			}
-			if len(arr) > 500 {
-				tmps := arr
-				Mgo.UpSertBulk(Savecoll, tmps...)
-				arr = [][]map[string]interface{}{}
-			}
+			//if len(update) == 2 {
+			//	arr = append(arr, update)
+			//}
+			//if len(arr) > 500 {
+			//	tmps := arr
+			//	Mgo.UpSertBulk(Savecoll, tmps...)
+			//	arr = [][]map[string]interface{}{}
+			//}
 			lock.Unlock()
 		}(tmp)
 		tmp = make(map[string]interface{})
@@ -809,7 +812,7 @@ func HistoryQyxyStandard() {
 	wg.Wait()
 	lock.Lock()
 	if len(arr) > 0 {
-		Mgo.UpSertBulk(Savecoll, arr...)
+		//Mgo.UpSertBulk(Savecoll, arr...)
 	}
 	lock.Unlock()
 	log.Println("Run Over...Count:", sum)
@@ -843,6 +846,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
+					qu.Debug(Index, Itype, arru)
 					Es.BulkSave(Index, Itype, &arru, true)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, 500)

+ 7 - 0
udpcreateindex/src/biddingall.go

@@ -240,6 +240,11 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 						ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w})
 						if len(*ent) > 0 {
 							cid = append(cid, qutil.ObjToString((*ent)["company_id"]))
+						}else {
+							ent, _ = mgostandard.FindOne("qyxy_std", map[string]interface{}{"company_name": w})
+							if len(*ent) > 0 {
+								cid = append(cid, qutil.ObjToString((*ent)["_id"]))
+							}
 						}
 					}
 				}
@@ -377,6 +382,8 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 							if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 								newTmp[field] = qutil.Int64All(tmp[field])
 							}
+						} else if field == "entidlist" {
+							newTmp[field] = tmp[field]
 						} else { //其它字段判断数据类型,不正确舍弃
 							if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 								continue

+ 8 - 0
udpcreateindex/src/biddingindex.go

@@ -54,6 +54,7 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 
 	//bidding库
 	session := mgo.GetMgoConn()
+	qutil.Debug(db, c)
 	count, _ := session.DB(db).C(c).Find(&q).Count()
 	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
 	n1, n2 := 0, 0
@@ -269,6 +270,11 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 					ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w})
 					if len(*ent) > 0 {
 						cid = append(cid, qutil.ObjToString((*ent)["company_id"]))
+					}else {
+						ent, _ = mgostandard.FindOne("qyxy_std", map[string]interface{}{"company_name": w})
+						if len(*ent) > 0 {
+							cid = append(cid, qutil.ObjToString((*ent)["_id"]))
+						}
 					}
 				}
 			}
@@ -400,6 +406,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 							newTmp[field] = qutil.Int64All(tmp[field])
 						}
+					} else if field == "entidlist" {
+						newTmp[field] = tmp[field]
 					} else { //其它字段判断数据类型,不正确舍弃
 						if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 							continue

+ 4 - 4
udpcreateindex/src/config.json

@@ -4,14 +4,14 @@
   "uname": "dataAnyWrite",
   "upwd": "data@dataAnyWrite",
   "mongodb": {
-    "addr": "192.168.3.205:27082,192.168.3.205:27083",
+    "addr": "192.168.3.207:27092",
     "pool": 10,
-    "db": "qfw"
+    "db": "wjh"
   },
   "savedb": {
     "addr": "192.168.3.207:27092",
     "size": 10,
-    "db": "wjh"
+    "db": "qfw_data"
   },
   "jkmail": {
     "to": "zhangjinkun@topnet.net.cn",
@@ -76,7 +76,7 @@
   "filelength": 50000,
   "detaillength": 50000,
   "project": {
-    "db": "wjh",
+    "db": "qfw_data",
     "collect": "projectset",
     "index": "projectset",
     "type": "projectset"

+ 3 - 3
udpcreateindex/src/main.go

@@ -67,9 +67,9 @@ func init() {
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
 		DbName:      mconf["db"].(string),
-		UserName:	 Sysconfig["uname"].(string),
-		Password:    Sysconfig["upwd"].(string),
-		ReplSet: 	 "bidding",
+		//UserName:	 Sysconfig["uname"].(string),
+		//Password:    Sysconfig["upwd"].(string),
+		//ReplSet: 	 "bidding",
 	}
 	mgo.InitPool()
 	project2db = &mongodb.MongodbSim{