main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/date"
  8. "app.yhyue.com/moapp/jybase/esv7"
  9. . "app.yhyue.com/moapp/jybase/mysql"
  10. "gorm.io/gorm"
  11. )
  12. type config struct {
  13. Elasticsearch struct {
  14. Address string
  15. Size int
  16. UserName string
  17. PassWord string
  18. }
  19. Mysql struct {
  20. DriverName string
  21. Url string
  22. MaxOpenConns int
  23. MaxIdleConns int
  24. MaxConnLifeTime int
  25. }
  26. DurationMinute int
  27. BlukSize int
  28. }
  29. type timeTask struct {
  30. Time string `json:"time"`
  31. }
  32. type DocStatistics struct {
  33. DocId string `json:"docId" gorm:"column:docId"`
  34. DownTimes int `json:"downTimes" gorm:"column:downTimes"`
  35. ViewTimes int `json:"viewTimes" gorm:"column:viewTimes"`
  36. }
  37. const (
  38. Es_Jydoc = "jydoc"
  39. )
  40. var (
  41. Config *config
  42. TimeTask *timeTask
  43. Mysql *gorm.DB
  44. )
  45. func init() {
  46. common.ReadConfig(&Config)
  47. if Config.DurationMinute == 0 {
  48. log.Fatalln("config.json中durationMinute配置项异常")
  49. }
  50. common.ReadConfig("./timetask.json", &TimeTask)
  51. elastic.InitElasticSizeByAuth(Config.Elasticsearch.Address, Config.Elasticsearch.Size, Config.Elasticsearch.UserName, Config.Elasticsearch.PassWord)
  52. log.Printf("初始化 elasticsearch")
  53. Mysql = GormMysql(Config.Mysql.Url, Config.Mysql.DriverName, Config.Mysql.MaxOpenConns, Config.Mysql.MaxIdleConns, nil)
  54. if Mysql != nil {
  55. log.Printf("初始化 mysql")
  56. } else {
  57. log.Fatalf("mysql初始化失败")
  58. }
  59. }
  60. func main() {
  61. run()
  62. <-chan bool(nil)
  63. }
  64. //
  65. func run() {
  66. defer common.Catch()
  67. now := date.NowFormat(date.Date_Full_Layout)
  68. log.Println("start update to es", TimeTask.Time, now)
  69. rows, err := Mysql.Raw(`select docId,downTimes,viewTimes from doc_statistics where updateDate>=? and updateDate<?`, TimeTask.Time, now).Rows()
  70. if err != nil {
  71. log.Println(err)
  72. return
  73. }
  74. if rows != nil {
  75. defer rows.Close()
  76. }
  77. array := [][]string{}
  78. index := 0
  79. for rows.Next() {
  80. var docId string
  81. var downTimes int
  82. var viewTimes int
  83. err = rows.Scan(&docId, &downTimes, &viewTimes)
  84. if err != nil {
  85. log.Println(err)
  86. break
  87. }
  88. log.Println("need update to es", "docId", docId, "downTimes", downTimes, "viewTimes", viewTimes)
  89. index++
  90. array = append(array, []string{docId, fmt.Sprintf("ctx._source.downTimes=%d;ctx._source.viewTimes=%d", downTimes, viewTimes)})
  91. if len(array) == Config.BlukSize {
  92. log.Println("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
  93. array = [][]string{}
  94. }
  95. }
  96. if len(array) > 0 {
  97. log.Println("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
  98. array = [][]string{}
  99. }
  100. TimeTask.Time = now
  101. common.WriteSysConfig("./timetask.json", &TimeTask)
  102. log.Println("update to es over", index)
  103. time.AfterFunc(time.Duration(Config.DurationMinute)*time.Minute, run)
  104. }