package main import ( "buyer_data/config" "fmt" "github.com/spf13/cobra" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "time" ) func main() { rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(buyerEnt()) // buyer_enterprise rootCmd.AddCommand(buyerErr()) // buyer_err rootCmd.AddCommand(buyerTidb()) // dws_f_ent_baseinfo rootCmd.AddCommand(deduplication()) // 去重 rootCmd.AddCommand(buyerPy()) // buyer_detail_1019 rootCmd.AddCommand(buyerTask()) // reliability不存在的数据 if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } c := make(chan bool, 1) <-c } func buyerEnt() *cobra.Command { cmdClient := &cobra.Command{ Use: "buyer-mongo", Short: "Start dispose buyer data", Run: func(cmd *cobra.Command, args []string) { go SaveMethod() taskInfo1() }, } return cmdClient } func buyerErr() *cobra.Command { cmdClient := &cobra.Command{ Use: "buyer-err", Short: "Start dispose buyer data", Run: func(cmd *cobra.Command, args []string) { go SaveMethod() taskInfo2() }, } return cmdClient } func buyerTidb() *cobra.Command { cmdClient := &cobra.Command{ Use: "buyer-tidb", Short: "Start dispose buyer-tidb data", Run: func(cmd *cobra.Command, args []string) { InitMysql() go SaveMethod() go updateMethod() taskMysql() }, } return cmdClient } func deduplication() *cobra.Command { cmdClient := &cobra.Command{ Use: "buyer-dep", Short: "Start deduplication data", Run: func(cmd *cobra.Command, args []string) { redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Code, config.Conf.DB.Redis.Addr), 8) taskInfo5() }, } return cmdClient } func buyerPy() *cobra.Command { cmdClient := &cobra.Command{ Use: "buyer-py", Short: "Start dispose buyer-python data", Run: func(cmd *cobra.Command, args []string) { go SaveMethod() go updateMethod() taskInfo4() }, } return cmdClient } func buyerTask() *cobra.Command { cmdClient := &cobra.Command{ Use: "buyer-err", Short: "Start dispose buyer data", Run: func(cmd *cobra.Command, args []string) { go updateMethod() taskInfo3() }, } return cmdClient } func SaveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk(config.Conf.DB.Mongo.SaveColl, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk(config.Conf.DB.Mongo.SaveColl, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk(config.Conf.DB.Mongo.SaveColl, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk(config.Conf.DB.Mongo.SaveColl, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }