apple 4 gadi atpakaļ
vecāks
revīzija
84db3117f3

+ 2 - 1
data_monitoring/listen_task/src/main.go

@@ -106,6 +106,7 @@ func dealWithOtherData()  {
 	//先获取-待统计-动态站点
 	tmp ,tmp_err := save_mgo.Find("z_site_spider", map[string]interface{}{},nil,nil)
 	if tmp_err==nil && tmp!=nil && len(tmp)>0 {
+		infoSiteArr = make([]string,0)
 		for _,v :=range tmp {
 			infoSiteArr = append(infoSiteArr,qu.ObjToString(v["site"]))
 		}
@@ -208,7 +209,7 @@ func dealWithOtherData()  {
 	}
 
 
-	sendErrMailSmtp("统计-站点-企业","详情请查阅附件")
+	sendErrMailSmtp("统计-站点-企业","附件")
 
 	log.Println("定时处理完毕...",int(time.Now().Unix())-start,"秒")
 }

+ 0 - 13
data_monitoring/vps_client/src/main.go

@@ -93,19 +93,6 @@ func task()  {
 
 
 
-
-
-
-
-
-
-
-
-
-
-
-
-
 //根据进程名判断进程是否运行
 func checkProRunning(serverName string) (bool, error) {
 	cmd := `ps ux | awk '/` + serverName + `/ && !/awk/ {print $2}'`

+ 305 - 0
data_monitoring/words_vaild/src/main.go

@@ -0,0 +1,305 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/tealeg/xlsx"
+	"log"
+	"os"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"strings"
+	"sync"
+	"unicode/utf8"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+
+)
+var (
+	sysconfig			map[string]interface{} //配置文件
+	save_mgo        	*MongodbSim
+)
+
+func init()  {
+	save_mgo = &MongodbSim{
+		MongodbAddr: "192.168.3.207:27092",
+		DbName:      "zhengkun",
+		Size:        5,
+	}
+	save_mgo.InitPool()
+
+	elastic.InitElasticSize("http://192.168.3.11:9800",20)
+}
+
+func dealWithDataXlsx()  {
+
+	q := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	it := sess.DB(save_mgo.DbName).C("zk_test_words").Find(&q).Iter()
+	total:=0
+	saveArr := make([]map[string]string,0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total % 10000 == 0 {
+			log.Println("current index",total,tmp["_id"])
+		}
+
+		if total % 30 ==0 {
+			name:=qu.ObjToString(tmp["name"])
+			dict := make(map[string]string)
+			dict["name"] = name
+			for i:=0; i<5;i++ {
+				value,total,hit :="","",""
+				key := "word_"+fmt.Sprintf("%d",i)
+				if tmp[key]!=nil {
+
+					if arr,ok := tmp[key].(primitive.A);ok {
+						dataArr :=qu.ObjArrToMapArr(arr)
+						value =qu.ObjToString(dataArr[0]["name"])
+						if i!=0 {
+							total = fmt.Sprintf("%d",dataArr[0]["all_words"])
+							hit = fmt.Sprintf("%d",dataArr[0]["hit_words"])
+						}
+					}
+
+				}
+				key1,key2:="total"+fmt.Sprintf("%d",i),"hit"+fmt.Sprintf("%d",i)
+				dict[key] = value
+				dict[key1] = total
+				dict[key2] = hit
+
+			}
+			saveArr= append(saveArr,dict)
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+	os.Remove("words.xlsx")	//写excle
+	f :=xlsx.NewFile()
+
+
+	for i:=0; i<5;i++ {
+		key := "word_"+fmt.Sprintf("%d",i)
+		sheet, _ := f.AddSheet("统计"+key)
+		row := sheet.AddRow()
+		row.AddCell().Value = "name"
+		row.AddCell().Value = key
+		if i!=0 {
+			row.AddCell().Value = "total"
+			row.AddCell().Value = "hit"
+		}
+		key1,key2:="total"+fmt.Sprintf("%d",i),"hit"+fmt.Sprintf("%d",i)
+
+		for _,tmp := range saveArr {
+			row = sheet.AddRow()
+			row.AddCell().SetString(tmp["name"])
+			row.AddCell().SetString(tmp[key])
+			row.AddCell().SetString(fmt.Sprintf("%s",tmp[key1]))
+			row.AddCell().SetString(fmt.Sprintf("%s",tmp[key2]))
+		}
+	}
+
+	err := f.Save("words.xlsx")
+	if err != nil {
+		log.Println("保存xlsx失败:", err)
+	}else {
+		log.Println("保存xlsx成功:", err)
+	}
+
+}
+
+func main()  {
+
+	//导出xlsx
+	dealWithDataXlsx()
+	return
+
+
+
+
+	defer qu.Catch()
+	log.Println("处理 ... 指定企业名称 ...")
+
+	//分析错误数据
+	//
+	q := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	//细节才需要遍历
+	it := sess.DB(save_mgo.DbName).C("zk_company_test").Find(&q).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total % 10000 == 0 {
+			log.Println("current index",total,tmp["_id"])
+		}
+
+		name:=qu.ObjToString(tmp["name"])
+		save_dict := make(map[string]interface{},0)
+		for i:=0; i<5;i++ {
+			key := "word_"+fmt.Sprintf("%d",i)
+			dataArr :=dealWithScoreRules(name,i)
+			if dataArr ==nil || len(dataArr)<1 {
+				//无数据
+			}else {
+				save_dict[key] = dealWithWordsRules(name,dataArr,i)
+			}
+
+		}
+
+		if len(save_dict)>0 {
+			save_dict["name"]  = name
+			save_mgo.Save("zk_test_words",save_dict)
+		}
+
+		tmp = make(map[string]interface{})
+	}
+
+}
+
+//分数维度
+func dealWithScoreRules(name string,space int) []map[string]interface{} {
+	key := ""
+	if space>0&&space<5{
+		key = fmt.Sprintf("%d",space)
+	}
+	query:= `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name_`+key+`","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":3,"sort":[],"facets":{}}`
+
+	if key=="" {
+		query = `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":3,"sort":[],"facets":{}}`
+	}
+	client := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(client)
+	searchResult, err := client.Search().Index("azktest").Type("azktest").Source(query).Do()
+	if err != nil {
+		log.Println("从ES查询出错", err.Error())
+		return nil
+	}
+	resNum := len(searchResult.Hits.Hits)
+	res := make([]map[string]interface{}, resNum)
+	if searchResult.Hits != nil {
+		if resNum < 5000 {
+			for i, hit := range searchResult.Hits.Hits {
+				data := make(map[string]interface{},0)
+				json.Unmarshal(*hit.Source, &data)
+				res[i] = map[string]interface{}{
+					"name":data["name"],
+					"score":*hit.Score,
+				}
+			}
+		} else {
+			log.Println("查询结果太多,查询到:", resNum, "条")
+		}
+
+	}
+	return res
+}
+
+
+
+//击中数量以及比例
+func dealWithWordsRules(name string ,source []map[string]interface{},space int) []map[string]interface{} {
+
+	nameArr,_ := calculateWordCount(name,space)
+	newArr := make([]map[string]interface{},0)
+	for _,v := range source {
+		total,hit :=0,0
+		source_name :=qu.ObjToString(v["name"])
+		_,total = calculateWordCount(source_name,space)
+		for _,v1 := range nameArr {
+			if strings.Contains(source_name,v1) {
+				hit++
+			}
+		}
+
+
+		if space==0 {
+			newArr = append(newArr, map[string]interface{}{
+				"name":source_name,
+				"score":qu.Float64All(v["score"]),
+			})
+		}else {
+			newArr = append(newArr, map[string]interface{}{
+				"name":source_name,
+				"score":qu.Float64All(v["score"]),
+				"all_words" : total,
+				"hit_words" : hit,
+			})
+		}
+	}
+	return newArr
+}
+
+//分词结果
+func calculateWordCount(name string,space int) ([]string,int) {
+	arr := make([]string,0)
+	total := utf8.RuneCountInString(name)-(space-1)
+	if name == "" || space<=0 || total<=0  {
+		return arr,0
+	}
+	nameRune := []rune(name)
+	for i:=0;i<total ;i++  {
+		new_str := string(nameRune[i:space+i])
+		arr = append(arr,new_str)
+	}
+	return arr,len(arr)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+func readyDataEs()  {
+
+	q := map[string]interface{}{}
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	//多线程升索引
+	pool_es := make(chan bool, 10)
+	wg_es := &sync.WaitGroup{}
+	//细节才需要遍历
+	it := sess.DB(save_mgo.DbName).C("zk_company_name").Find(&q).Iter()
+	total:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total % 10000 == 0 {
+			log.Println("current index",total,tmp["_id"])
+		}
+		savetmp := make(map[string]interface{}, 0)
+		savetmp["_id"] = tmp["_id"]
+		savetmp["name"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_1"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_2"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_3"] = qu.ObjToString(tmp["company_name"])
+		savetmp["name_4"] = qu.ObjToString(tmp["company_name"])
+		pool_es <- true
+		wg_es.Add(1)
+		go func(savetmp map[string]interface{}) {
+			defer func() {
+				<-pool_es
+				wg_es.Done()
+			}()
+			elastic.Save("azktest","azktest", savetmp)
+		}(savetmp)
+		tmp = make(map[string]interface{})
+	}
+	wg_es.Wait()
+
+
+	log.Println("is over",total)
+}
+

+ 328 - 0
data_monitoring/words_vaild/src/mgo.go

@@ -0,0 +1,328 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}