main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/date"
  8. "app.yhyue.com/moapp/jybase/esv7"
  9. . "app.yhyue.com/moapp/jybase/mysql"
  10. "gorm.io/gorm"
  11. )
  12. type config struct {
  13. Elasticsearch struct {
  14. Address string
  15. Size int
  16. UserName string
  17. PassWord string
  18. }
  19. Mysql struct {
  20. DriverName string
  21. Url string
  22. MaxOpenConns int
  23. MaxIdleConns int
  24. MaxConnLifeTime int
  25. }
  26. DurationMinute int
  27. BlukSize int
  28. }
  29. type timeTask struct {
  30. Time string `json:"time"`
  31. }
  32. type DocStatistics struct {
  33. DocId string `json:"docId" gorm:"column:docId"`
  34. DownTimes int `json:"downTimes" gorm:"column:downTimes"`
  35. ViewTimes int `json:"viewTimes" gorm:"column:viewTimes"`
  36. }
  37. const (
  38. Es_Jydoc = "jydoc"
  39. )
  40. var (
  41. Config *config
  42. TimeTask *timeTask
  43. Mysql *gorm.DB
  44. )
  45. func init() {
  46. common.ReadConfig(&Config)
  47. if Config.DurationMinute == 0 {
  48. log.Fatalln("config.json中durationMinute配置项异常")
  49. }
  50. common.ReadConfig("./timetask.json", &TimeTask)
  51. elastic.InitElasticSizeByAuth(Config.Elasticsearch.Address, Config.Elasticsearch.Size, Config.Elasticsearch.UserName, Config.Elasticsearch.PassWord)
  52. log.Printf("初始化 elasticsearch")
  53. Mysql = GormMysql(Config.Mysql.Url, Config.Mysql.DriverName, Config.Mysql.MaxOpenConns, Config.Mysql.MaxIdleConns, nil)
  54. if Mysql != nil {
  55. log.Printf("初始化 mysql")
  56. } else {
  57. log.Fatalf("mysql初始化失败")
  58. }
  59. }
  60. func main() {
  61. run()
  62. <-chan bool(nil)
  63. }
  64. //
  65. func run() {
  66. now := date.NowFormat(date.Date_Full_Layout)
  67. log.Println("start update to es", TimeTask.Time, now)
  68. rows, err := Mysql.Raw(`select docId,downTimes,viewTimes from doc_statistics where updateDate>=? and updateDate<?`, TimeTask.Time, now).Rows()
  69. if err != nil {
  70. log.Println(err)
  71. return
  72. }
  73. if rows != nil {
  74. defer rows.Close()
  75. }
  76. array := [][]string{}
  77. index := 0
  78. for rows.Next() {
  79. var docId string
  80. var downTimes int
  81. var viewTimes int
  82. err = rows.Scan(&docId, &downTimes, &viewTimes)
  83. if err != nil {
  84. log.Println(err)
  85. break
  86. }
  87. log.Println("need update to es", "docId", docId, "downTimes", downTimes, "viewTimes", viewTimes)
  88. index++
  89. array = append(array, []string{docId, fmt.Sprintf("ctx._source.downTimes=%d;ctx._source.viewTimes=%d", downTimes, viewTimes)})
  90. if len(array) == Config.BlukSize {
  91. log.Println("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
  92. array = [][]string{}
  93. }
  94. }
  95. if len(array) > 0 {
  96. log.Println("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
  97. array = [][]string{}
  98. }
  99. TimeTask.Time = now
  100. common.WriteSysConfig("./timetask.json", &TimeTask)
  101. log.Println("update to es over", index)
  102. time.AfterFunc(time.Duration(Config.DurationMinute)*time.Minute, run)
  103. }