main.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/date"
  8. "app.yhyue.com/moapp/jybase/esv7"
  9. "app.yhyue.com/moapp/jybase/go-logger/logger"
  10. . "app.yhyue.com/moapp/jybase/mysql"
  11. "gorm.io/gorm"
  12. )
  13. type config struct {
  14. Elasticsearch struct {
  15. Address string
  16. Size int
  17. UserName string
  18. PassWord string
  19. }
  20. Mysql struct {
  21. DriverName string
  22. Url string
  23. MaxOpenConns int
  24. MaxIdleConns int
  25. MaxConnLifeTime int
  26. }
  27. DurationMinute int
  28. BlukSize int
  29. }
  30. type timeTask struct {
  31. Time string `json:"time"`
  32. }
  33. type DocStatistics struct {
  34. DocId string `json:"docId" gorm:"column:docId"`
  35. DownTimes int `json:"downTimes" gorm:"column:downTimes"`
  36. ViewTimes int `json:"viewTimes" gorm:"column:viewTimes"`
  37. }
  38. const (
  39. Es_Jydoc = "jydoc"
  40. )
  41. var (
  42. Config *config
  43. TimeTask *timeTask
  44. Mysql *gorm.DB
  45. )
  46. func init() {
  47. common.ReadConfig(&Config)
  48. if Config.DurationMinute == 0 {
  49. log.Fatalln("config.json中durationMinute配置项异常")
  50. }
  51. common.ReadConfig("./timetask.json", &TimeTask)
  52. elastic.InitElasticSizeByAuth(Config.Elasticsearch.Address, Config.Elasticsearch.Size, Config.Elasticsearch.UserName, Config.Elasticsearch.PassWord)
  53. log.Println("初始化 elasticsearch")
  54. Mysql = GormMysql(Config.Mysql.Url, Config.Mysql.DriverName, Config.Mysql.MaxOpenConns, Config.Mysql.MaxIdleConns, nil)
  55. if Mysql != nil {
  56. log.Println("初始化 mysql")
  57. } else {
  58. log.Fatalf("mysql初始化失败")
  59. }
  60. logger.SetConsole(false)
  61. logger.SetRollingDaily("./logs", "timetask.log")
  62. }
  63. func main() {
  64. run()
  65. <-chan bool(nil)
  66. }
  67. //
  68. func run() {
  69. defer common.Catch()
  70. now := date.NowFormat(date.Date_Full_Layout)
  71. log.Println("start update to es", TimeTask.Time, now)
  72. rows, err := Mysql.Raw(`select docId,downTimes,viewTimes from doc_statistics where updateDate>=? and updateDate<?`, TimeTask.Time, now).Rows()
  73. if err != nil {
  74. logger.Error(err)
  75. return
  76. }
  77. if rows != nil {
  78. defer rows.Close()
  79. }
  80. array := [][]string{}
  81. index := 0
  82. for rows.Next() {
  83. var docId string
  84. var downTimes int
  85. var viewTimes int
  86. err = rows.Scan(&docId, &downTimes, &viewTimes)
  87. if err != nil {
  88. logger.Error(err)
  89. continue
  90. }
  91. log.Println("need update to es", "docId", docId, "downTimes", downTimes, "viewTimes", viewTimes)
  92. index++
  93. array = append(array, []string{docId, fmt.Sprintf("ctx._source.downTimes=%d;ctx._source.viewTimes=%d", downTimes, viewTimes)})
  94. if len(array) == Config.BlukSize {
  95. logger.Info("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
  96. array = [][]string{}
  97. }
  98. }
  99. if len(array) > 0 {
  100. logger.Info("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
  101. array = [][]string{}
  102. }
  103. TimeTask.Time = now
  104. common.WriteSysConfig("./timetask.json", &TimeTask)
  105. logger.Info("update to es over", index)
  106. time.AfterFunc(time.Duration(Config.DurationMinute)*time.Minute, run)
  107. }