|
@@ -0,0 +1,110 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "app.yhyue.com/moapp/jybase/common"
|
|
|
+ "app.yhyue.com/moapp/jybase/date"
|
|
|
+ "app.yhyue.com/moapp/jybase/esv7"
|
|
|
+ . "app.yhyue.com/moapp/jybase/mysql"
|
|
|
+ "gorm.io/gorm"
|
|
|
+)
|
|
|
+
|
|
|
+type config struct {
|
|
|
+ Elasticsearch struct {
|
|
|
+ Address string
|
|
|
+ Size int
|
|
|
+ UserName string
|
|
|
+ PassWord string
|
|
|
+ }
|
|
|
+ Mysql struct {
|
|
|
+ DriverName string
|
|
|
+ Url string
|
|
|
+ MaxOpenConns int
|
|
|
+ MaxIdleConns int
|
|
|
+ MaxConnLifeTime int
|
|
|
+ }
|
|
|
+ DurationMinute int
|
|
|
+ BlukSize int
|
|
|
+}
|
|
|
+type timeTask struct {
|
|
|
+ Time string `json:"time"`
|
|
|
+}
|
|
|
+type DocStatistics struct {
|
|
|
+ DocId string `json:"docId" gorm:"column:docId"`
|
|
|
+ DownTimes int `json:"downTimes" gorm:"column:downTimes"`
|
|
|
+ ViewTimes int `json:"viewTimes" gorm:"column:viewTimes"`
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ Es_Jydoc = "jydoc"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ Config *config
|
|
|
+ TimeTask *timeTask
|
|
|
+ Mysql *gorm.DB
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ common.ReadConfig(&Config)
|
|
|
+ if Config.DurationMinute == 0 {
|
|
|
+ log.Fatalln("config.json中durationMinute配置项异常")
|
|
|
+ }
|
|
|
+ common.ReadConfig("./timetask.json", &TimeTask)
|
|
|
+ elastic.InitElasticSizeByAuth(Config.Elasticsearch.Address, Config.Elasticsearch.Size, Config.Elasticsearch.UserName, Config.Elasticsearch.PassWord)
|
|
|
+ log.Printf("初始化 elasticsearch")
|
|
|
+ Mysql = GormMysql(Config.Mysql.Url, Config.Mysql.DriverName, Config.Mysql.MaxOpenConns, Config.Mysql.MaxIdleConns, nil)
|
|
|
+ if Mysql != nil {
|
|
|
+ log.Printf("初始化 mysql")
|
|
|
+ } else {
|
|
|
+ log.Fatalf("mysql初始化失败")
|
|
|
+ }
|
|
|
+}
|
|
|
+func main() {
|
|
|
+ run()
|
|
|
+ <-chan bool(nil)
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+func run() {
|
|
|
+ now := date.NowFormat(date.Date_Full_Layout)
|
|
|
+ log.Println("start update to es", TimeTask.Time, now)
|
|
|
+ rows, err := Mysql.Raw(`select docId,downTimes,viewTimes from doc_statistics where updateDate>=? and updateDate<?`, TimeTask.Time, now).Rows()
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if rows != nil {
|
|
|
+ defer rows.Close()
|
|
|
+ }
|
|
|
+ array := [][]string{}
|
|
|
+ index := 0
|
|
|
+ for rows.Next() {
|
|
|
+ var docId string
|
|
|
+ var downTimes int
|
|
|
+ var viewTimes int
|
|
|
+ err = rows.Scan(&docId, &downTimes, &viewTimes)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ log.Println("need update to es", "docId", docId, "downTimes", downTimes, "viewTimes", viewTimes)
|
|
|
+ index++
|
|
|
+ array = append(array, []string{docId, fmt.Sprintf("ctx._source.downTimes=%d;ctx._source.viewTimes=%d", downTimes, viewTimes)})
|
|
|
+ if len(array) == Config.BlukSize {
|
|
|
+ log.Println("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
|
|
|
+ array = [][]string{}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(array) > 0 {
|
|
|
+ log.Println("update es index", index, elastic.NewBulkUpdate(Es_Jydoc, array...))
|
|
|
+ array = [][]string{}
|
|
|
+ }
|
|
|
+ TimeTask.Time = now
|
|
|
+ common.WriteSysConfig("./timetask.json", &TimeTask)
|
|
|
+ log.Println("update to es over", index)
|
|
|
+ time.AfterFunc(time.Duration(Config.DurationMinute)*time.Minute, run)
|
|
|
+}
|