main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/olivere/elastic/v7"
  6. "github.com/robfig/cron/v3"
  7. "github.com/spf13/viper"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "log"
  10. "time"
  11. )
  12. var (
  13. GF GlobalConf
  14. Rest = make(map[string]interface{}, 0) //保存最终各个字段识别率
  15. MgoB *mongodb.MongodbSim
  16. )
  17. func main() {
  18. InitConfig()
  19. InitMgo()
  20. //getStatistic()
  21. local, _ := time.LoadLocation("Asia/Shanghai")
  22. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  23. _, err := c.AddFunc(GF.Env.Spec, getStatistic)
  24. if err != nil {
  25. log.Println(err)
  26. }
  27. c.Start()
  28. defer c.Stop()
  29. select {}
  30. }
  31. func InitConfig() (err error) {
  32. viper.SetConfigFile("config.toml") // 指定配置文件路径
  33. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  34. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  35. viper.AddConfigPath("./")
  36. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  37. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  38. err = viper.ReadInConfig() // 查找并读取配置文件
  39. if err != nil { // 处理读取配置文件的错误
  40. return
  41. }
  42. err = viper.Unmarshal(&GF)
  43. return err
  44. }
  45. func InitMgo() {
  46. MgoB = &mongodb.MongodbSim{
  47. MongodbAddr: GF.MongoB.Host,
  48. Size: 10,
  49. DbName: GF.MongoB.DB,
  50. UserName: GF.MongoB.Username,
  51. Password: GF.MongoB.Password,
  52. Direct: GF.MongoB.Direct,
  53. }
  54. MgoB.InitPool()
  55. }
  56. //getStatistic 获取统计
  57. func getStatistic() {
  58. log.Println("开始统计")
  59. // 创建 Elasticsearch 客户端
  60. client, err := elastic.NewClient(
  61. elastic.SetURL(GF.ES.URL),
  62. elastic.SetBasicAuth(GF.ES.Username, GF.ES.Password),
  63. elastic.SetSniff(false),
  64. )
  65. if err != nil {
  66. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  67. }
  68. // 获取昨天零点和今天零点的时间戳
  69. now := time.Now()
  70. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
  71. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  72. totalQuery := elastic.NewBoolQuery().Must(
  73. elastic.NewRangeQuery("comeintime").
  74. Gte(yesterday.Unix()). // 大于昨天零点
  75. Lt(today.Unix()), // 小于今天零点
  76. )
  77. totalNum, _ := client.Count().
  78. Index("bidding").
  79. Query(totalQuery).
  80. Do(context.Background())
  81. Rest["total"] = totalNum
  82. Rest["statistic_date"] = yesterday.Format("2006-01-02")
  83. for _, v := range GF.Fields {
  84. //log.Println(v.DataType)
  85. // 构建查询
  86. topQuery := elastic.NewBoolQuery().Must(
  87. elastic.NewTermQuery("toptype", v.Toptype),
  88. elastic.NewRangeQuery("comeintime").
  89. Gte(yesterday.Unix()). // 大于昨天零点
  90. Lt(today.Unix()), // 小于今天零点
  91. )
  92. topNum, _ := client.Count().
  93. Index("bidding").
  94. Query(topQuery).
  95. Do(context.Background())
  96. ra := make(map[string]interface{}, 0)
  97. for _, field := range v.Field {
  98. //fmt.Println(field)
  99. query := elastic.NewBoolQuery().
  100. Must(
  101. elastic.NewTermQuery("toptype", v.Toptype),
  102. elastic.NewExistsQuery(field),
  103. elastic.NewRangeQuery("comeintime").
  104. Gte(yesterday.Unix()). // 大于昨天零点
  105. Lt(today.Unix()), // 小于今天零点
  106. )
  107. // 执行查询
  108. count, _ := client.Count().
  109. Index("bidding").
  110. Query(query).
  111. Do(context.Background())
  112. // 将 int64 转换为 float64,并计算百分比
  113. percentage := (float64(count) / float64(topNum)) * 100.0
  114. ra[field] = fmt.Sprintf("%.2f%%", percentage)
  115. }
  116. Rest[v.DataType] = ra
  117. }
  118. //fmt.Println(Rest)
  119. MgoB.Save("bidding_statistic", Rest)
  120. log.Println("结束统计")
  121. }