package main import ( "data_credible/config" "fmt" "github.com/spf13/cobra" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "sync" "time" ) func main() { loadSite() go saveQueue() rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(taskOld()) rootCmd.AddCommand(taskNew()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } c := make(chan bool, 1) <-c } func taskOld() *cobra.Command { cmdClient := &cobra.Command{ Use: "old", Short: "Start dispose task", Run: func(cmd *cobra.Command, args []string) { sess := MongoS.GetMgoConn() defer MongoS.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(Dbname).C(Coll).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%5000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(temp map[string]interface{}) { defer func() { <-ch wg.Done() }() task1(temp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) }, } cmdClient.Flags().StringVarP(&Dbname, "dbname", "d", "", "库名") cmdClient.Flags().StringVarP(&Coll, "coll", "c", "", "表名") return cmdClient } func taskNew() *cobra.Command { cmdClient := &cobra.Command{ Use: "new", Short: "Start dispose task", Run: func(cmd *cobra.Command, args []string) { sess := MongoBz.GetMgoConn() defer MongoBz.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.MongoBz.Dbname).C(config.Conf.DB.MongoBz.Coll).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%5000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(temp map[string]interface{}) { defer func() { <-ch wg.Done() }() if temp["v_taginfo"] != nil { taskInfo(temp) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) }, } return cmdClient } func loadSite() { f := bson.M{"site": 1, "site_type": 1, "second_type": 1, "platform": 1} info, _ := MongoS.Find("site", nil, nil, f, false, -1, -1) if len(*info) > 0 { for _, m := range *info { SiteMap[util.ObjToString(m["site"])] = fmt.Sprintf("%s-%s-%s", util.ObjToString(m["site_type"]), util.ObjToString(m["second_type"]), util.ObjToString(m["platform"])) } } log.Info("loadSite", zap.Int("SiteMap", len(SiteMap))) info1, _ := MongoS.Find("luaconfig", nil, nil, bson.M{"code": 1, "platform": 1}, false, -1, -1) if len(*info1) > 0 { for _, m := range *info1 { CodeMap[util.ObjToString(m["code"])] = util.ObjToString(m["platform"]) } } log.Info("loadSite", zap.Int("CodeMap", len(CodeMap))) } func saveQueue() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() Mongo.SaveBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() Mongo.SaveBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }