|
- package main
- import (
- "context"
- "fmt"
- es7 "github.com/olivere/elastic/v7"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "strconv"
- "testing"
- "time"
- )
- //日报程序
- func TestR(t *testing.T) {
- start := -1
- end := 0
- esQ := `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
- esQ1 := `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
- url := "http://127.0.0.1:19805"
- username := "es_all"
- password := "TopJkO2E_d1x"
- url2 := "http://127.0.0.1:19905"
- username2 := "jybid"
- password2 := "Top2023_JEB01i@31"
- // 创建 Elasticsearch 客户端
- client, err := es7.NewClient(
- es7.SetURL(url),
- es7.SetBasicAuth(username, password),
- es7.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- // 创建 Elasticsearch 客户端
- client2, err := es7.NewClient(
- es7.SetURL(url2),
- es7.SetBasicAuth(username2, password2),
- es7.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- st := util.GetDayStartSecond(start) //
- et := util.GetDayStartSecond(end) //
- st1 := fmt.Sprintf("%x0000000000000000", st)
- et1 := fmt.Sprintf("%x0000000000000000", et)
- eq := fmt.Sprintf(esQ, st1, et1)
- fmt.Println("eq", eq)
- eq1 := fmt.Sprintf(esQ1, st1, et1)
- fmt.Println("eq1", eq1)
- rangeQuery := es7.NewRangeQuery("id").Gte(st1).Lt(et1)
- termsQuery := es7.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网")
- countQuery := es7.NewBoolQuery().
- Must(rangeQuery)
- //
- count1Query := es7.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
- // 老集群数量
- count, err := client.Count("bidding").Query(countQuery).Do(context.Background())
- if err != nil {
- fmt.Println("统计文档数量出错:", err)
- }
- // 竞品数量
- count1, err := client.Count("bidding").Query(count1Query).Do(context.Background())
- if err != nil {
- fmt.Println("统计文档数量出错:", err)
- }
- //新集群数量
- countNew, err := client2.Count("bidding").Query(countQuery).Do(context.Background())
- if err != nil {
- fmt.Println("统计文档数量出错:", err)
- }
- fmt.Println(count)
- fmt.Println(count1)
- fmt.Println(countNew)
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- Direct: true,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- Mgo.InitPool()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- id1 := mongodb.StringTOBsonId(st1)
- id2 := mongodb.StringTOBsonId(et1)
- now := time.Now()
- mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
- fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
- /**
- count 一天内,es 中 数据总量
- count1 一天内,es 中 竞品总量
- count2 一天内,mgo 总入库量
- count3 一天内,mgo 有效数据 总数
- count4 一天内,mgo 中 竞品数据总量
- count5 一天内,mgo 有效数据中,竞品的数量
- countNew 一天内,es3新集群 中 数据总量
- */
- count2, count3 := int64(0), int64(0) //
- count4, count5 := int64(0), int64(0) //竟品
- es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
- es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
- es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
- es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
- file_totaltime := int64(0)
- no_file_totaltime := int64(0)
- file_avgltime := int64(0)
- no_file_avgltime := int64(0)
- timeCount := 0 // 统计时间差的数据量
- fileCount := 0
- noFileCount := 0
- //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
- var pc_diff1 int64
- var pc_diff3 int64
- var pc_diff5 int64
- var pc_diff10 int64
- var pc_diff15 int64
- var pc_diff30 int64
- var pc_diff301 int64
- query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
- for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
- if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" {
- count4++
- }
- if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
- count3++
- comeintime := util.Int64All(tmp["comeintime"])
- publishtime := util.Int64All(tmp["publishtime"])
- pici := util.Int64All(tmp["pici"])
- if (comeintime - publishtime) < 12*60*60 {
- if pici > 0 {
- diff1 := pici - comeintime
- diff2 := pici - publishtime
- if diff1 < 0 {
- fmt.Println("diff1", diff1, tmp["_id"])
- } else if diff1 <= 60 {
- pc_diff1++
- } else if diff1 <= 3*60 {
- pc_diff3++
- } else if diff1 <= 5*60 {
- pc_diff5++
- } else if diff1 <= 10*60 {
- pc_diff10++
- } else if diff1 <= 15*60 {
- pc_diff15++
- } else if diff1 <= 30*60 {
- pc_diff30++
- } else {
- pc_diff301++
- }
- if diff2 < 0 {
- fmt.Println("diff2", diff2, tmp["_id"])
- }
- es_comeintime_totaltime += diff1
- es_publishtime_totaltime += diff2
- timeCount++
- if _, ok := tmp["attach_text"]; ok {
- curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
- diff3 := curtime - comeintime
- if diff3 >= 0 {
- file_totaltime += diff3
- }
- fileCount++
- } else {
- curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
- diff4 := curtime - comeintime
- if diff4 >= 0 {
- no_file_totaltime += diff4
- }
- noFileCount++
- }
- }
- }
- if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" {
- count5++
- }
- }
- }
- if timeCount > 0 {
- es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
- es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
- }
- if fileCount > 0 {
- file_avgltime = file_totaltime / int64(fileCount)
- }
- if noFileCount > 0 {
- no_file_avgltime = no_file_totaltime / int64(noFileCount)
- }
- report := fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", "日报", count)
- report += ",mgo统计:" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-count) + ",mgo总入库量:" + fmt.Sprint(count2)
- report += "<br>" + "【竟品统计结果】:" + strconv.Itoa(int(count1)) + ",mgo统计:" + fmt.Sprint(count5) + ",差值:" + fmt.Sprint(count5-count1) + ",mgo总入库量" + fmt.Sprint(count4) + "<br>【新集群统计结果】es数量:" + strconv.Itoa(int(countNew)) + ",mgo统计:" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-countNew)
- //存入数据库
- fmt.Println(report)
- //存入数据库
- yesterday := now.AddDate(0, 0, start)
- insert := map[string]interface{}{
- "es_count": count, //
- "es3_count": countNew, //
- "mgo_count": count3,
- "es_mgo_diff": count3 - count,
- "mgo_total": count2,
- "competitor_es_count": count1, //竞品网站es 数量
- "competitor_mgo_count": count5,
- "competitor_diff": count5 - count1,
- "competitor_mgo_total": count4,
- "date": yesterday.Format("2006-01-02"),
- "es_comeintime_totaltime": es_comeintime_totaltime,
- "es_publishtime_totaltime": es_publishtime_totaltime,
- "es_comeintime_avgtime": es_comeintime_avgtime,
- "es_publishtime_avgtime": es_publishtime_avgtime,
- "file_avgltime": file_avgltime,
- "no_file_avgltime": no_file_avgltime,
- "file_totaltime": file_totaltime,
- "no_file_totaltime": no_file_totaltime,
- "file_count": fileCount,
- "no_file_count": noFileCount,
- "pc_diff1": pc_diff1,
- "pc_diff3": pc_diff3,
- "pc_diff5": pc_diff5,
- "pc_diff10": pc_diff10,
- "pc_diff15": pc_diff15,
- "pc_diff30": pc_diff30,
- "pc_diff301": pc_diff301,
- "timeCount": timeCount,
- }
- fmt.Println(insert)
- //Mgo.Save("bidding_ribao", insert)
- fmt.Println("success")
- }
- func TestCount(T *testing.T) {
- esClinet := elastic.Elastic{
- S_esurl: "http://127.0.0.1:19805",
- I_size: 2,
- Username: "es_all",
- Password: "TopJkO2E_d1x",
- }
- esClinet.InitElasticSize()
- rangeQuery := es7.NewRangeQuery("id").Gte("646746840000000000000000").Lt("646770b40000000000000000")
- //termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网")
- boolQuery := es7.NewBoolQuery().Must(rangeQuery)
- count1 := esClinet.Count("bidding", boolQuery)
- //fmt.Println("count1", count1)
- fmt.Println(strconv.Itoa(int(count1)))
- }
|