123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package main
- import (
- "data_credible/config"
- "fmt"
- "github.com/spf13/cobra"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "sync"
- "time"
- )
- func main() {
- loadSite()
- go saveQueue()
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(taskOld())
- rootCmd.AddCommand(taskNew())
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- c := make(chan bool, 1)
- <-c
- }
- func taskOld() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "old",
- Short: "Start dispose task",
- Run: func(cmd *cobra.Command, args []string) {
- sess := MongoS.GetMgoConn()
- defer MongoS.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- query := sess.DB(Dbname).C(Coll).Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%5000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(temp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- task1(temp)
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- },
- }
- cmdClient.Flags().StringVarP(&Dbname, "dbname", "d", "", "库名")
- cmdClient.Flags().StringVarP(&Coll, "coll", "c", "", "表名")
- return cmdClient
- }
- func taskNew() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "new",
- Short: "Start dispose task",
- Run: func(cmd *cobra.Command, args []string) {
- sess := MongoBz.GetMgoConn()
- defer MongoBz.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- query := sess.DB(config.Conf.DB.MongoBz.Dbname).C(config.Conf.DB.MongoBz.Coll).Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%5000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(temp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if temp["v_taginfo"] != nil {
- taskInfo(temp)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- },
- }
- return cmdClient
- }
- func loadSite() {
- f := bson.M{"site": 1, "site_type": 1, "second_type": 1, "platform": 1}
- info, _ := MongoS.Find("site", nil, nil, f, false, -1, -1)
- if len(*info) > 0 {
- for _, m := range *info {
- SiteMap[util.ObjToString(m["site"])] = fmt.Sprintf("%s-%s-%s", util.ObjToString(m["site_type"]),
- util.ObjToString(m["second_type"]), util.ObjToString(m["platform"]))
- }
- }
- log.Info("loadSite", zap.Int("SiteMap", len(SiteMap)))
- info1, _ := MongoS.Find("luaconfig", nil, nil, bson.M{"code": 1, "platform": 1}, false, -1, -1)
- if len(*info1) > 0 {
- for _, m := range *info1 {
- CodeMap[util.ObjToString(m["code"])] = util.ObjToString(m["platform"])
- }
- }
- log.Info("loadSite", zap.Int("CodeMap", len(CodeMap)))
- }
- func saveQueue() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- Mongo.SaveBulk(config.Conf.DB.Mongo.Coll, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1 * time.Second):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- Mongo.SaveBulk(config.Conf.DB.Mongo.Coll, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|