package main import ( "data_project_information/config" "fmt" "time" "github.com/robfig/cron/v3" "github.com/spf13/cobra" ) func init() { config.Init("./common.toml") InitLog() updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) saveSize = 200 savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 3) updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) } func main() { // //InitMgo() //InitEs() //go updateFuc() //go updateEsMethod() //taskAdd() //return go SaveFunc() rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(project()) rootCmd.AddCommand(projectAdd()) rootCmd.AddCommand(tidb()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } c := make(chan bool, 1) <-c } func project() *cobra.Command { cmdClient := &cobra.Command{ Use: "project", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { InitMgo() InitEs() go updateFuc() task() }, } return cmdClient } func projectAdd() *cobra.Command { cmdClient := &cobra.Command{ Use: "project-add", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { InitMgo() InitEs() go updateFuc() go updateEsMethod() taskAdd() crn := cron.New() _, _ = crn.AddFunc("10 * * * *", func() { taskAdd() }) crn.Start() ch := make(chan bool, 1) <-ch }, } return cmdClient } func tidb() *cobra.Command { cmdClient := &cobra.Command{ Use: "tidb", Short: "Start processing tidb data", Run: func(cmd *cobra.Command, args []string) { InitMgo() InitMysql() go SaveFunc() taskT() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } func updateFuc() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } func SaveFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MysqlTool.InsertBulk("property_project", BaseField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MysqlTool.InsertBulk("property_project", BaseField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } // 批量修改es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es1.UpdateBulk("projectset", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es1.UpdateBulk("projectset", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }