123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package main
- import (
- "fmt"
- "log"
- "time"
- "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/date"
- "app.yhyue.com/moapp/jybase/esv7"
- "app.yhyue.com/moapp/jybase/go-logger/logger"
- . "app.yhyue.com/moapp/jybase/mysql"
- "gorm.io/gorm"
- )
- type config struct {
- Elasticsearch struct {
- Address string
- Size int
- UserName string
- PassWord string
- }
- Mysql struct {
- DriverName string
- Url string
- MaxOpenConns int
- MaxIdleConns int
- MaxConnLifeTime int
- }
- DurationMinute int
- BlukSize int
- }
- type timeTask struct {
- Time string `json:"time"`
- }
- type DocStatistics struct {
- DocId string `json:"docId" gorm:"column:docId"`
- DownTimes int `json:"downTimes" gorm:"column:downTimes"`
- ViewTimes int `json:"viewTimes" gorm:"column:viewTimes"`
- }
- const (
- Es_Jydoc = "jydoc"
- )
- var (
- Config *config
- TimeTask *timeTask
- Mysql *gorm.DB
- )
- func init() {
- common.ReadConfig(&Config)
- if Config.DurationMinute == 0 {
- log.Fatalln("config.json中durationMinute配置项异常")
- }
- common.ReadConfig("./timetask.json", &TimeTask)
- elastic.InitElasticSizeByAuth(Config.Elasticsearch.Address, Config.Elasticsearch.Size, Config.Elasticsearch.UserName, Config.Elasticsearch.PassWord)
- log.Println("初始化 elasticsearch")
- Mysql = GormMysql(Config.Mysql.Url, Config.Mysql.DriverName, Config.Mysql.MaxOpenConns, Config.Mysql.MaxIdleConns, nil)
- if Mysql != nil {
- log.Println("初始化 mysql")
- } else {
- log.Fatalf("mysql初始化失败")
- }
- logger.SetConsole(false)
- logger.SetRollingDaily("./logs", "timetask.log")
- }
- func main() {
- run()
- <-chan bool(nil)
- }
- //
- func run() {
- defer common.Catch()
- now := date.NowFormat(date.Date_Full_Layout)
- log.Println("start update to es", TimeTask.Time, now)
- rows, err := Mysql.Raw(`select docId,downTimes,viewTimes from doc_statistics where updateDate>=? and updateDate<?`, TimeTask.Time, now).Rows()
- if err != nil {
- logger.Error(err)
- return
- }
- if rows != nil {
- defer rows.Close()
- }
- array := [][]string{}
- index := 0
- for rows.Next() {
- var docId string
- var downTimes int
- var viewTimes int
- err = rows.Scan(&docId, &downTimes, &viewTimes)
- if err != nil {
- logger.Error(err)
- continue
- }
- log.Println("need update to es", "docId", docId, "downTimes", downTimes, "viewTimes", viewTimes)
- index++
- array = append(array, []string{docId, fmt.Sprintf("ctx._source.downTimes=%d;ctx._source.viewTimes=%d", downTimes, viewTimes)})
- if len(array) == Config.BlukSize {
- logger.Info("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
- array = [][]string{}
- }
- }
- if len(array) > 0 {
- logger.Info("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
- array = [][]string{}
- }
- TimeTask.Time = now
- common.WriteSysConfig("./timetask.json", &TimeTask)
- logger.Info("update to es over", index)
- time.AfterFunc(time.Duration(Config.DurationMinute)*time.Minute, run)
- }
|