package main import ( "encoding/json" "fmt" "github.com/spf13/cobra" "github.com/tealeg/xlsx" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "tieta_data/config" "time" ) var ( udpClient udp.UdpClient updatePool chan []map[string]interface{} updateSp chan bool saveSize int ) func init() { config.Init("./common.toml") InitLog() updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 1) saveSize = 200 } func main() { rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(exportB()) rootCmd.AddCommand(exportP()) rootCmd.AddCommand(project()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } } func exportB() *cobra.Command { cmdClient := &cobra.Command{ Use: "export-bidding", Short: "Start export task", Run: func(cmd *cobra.Command, args []string) { InitMgo() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) file := xlsx.NewFile() sheet, err := file.AddSheet("sheet1") if err != nil { panic(err) } row := sheet.AddRow() for _, v := range FieldArr1 { row.AddCell().SetValue(v) } query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(&tmp); count++ { if count%1000 == 0 { util.Debug("current ---", count) } func(tmp map[string]interface{}) { row := sheet.AddRow() taskExcelB(tmp, row) }(tmp) } util.Debug("over ---", count) fname := fmt.Sprintf("./数据导出-标文%s.xlsx", util.NowFormat(util.DATEFORMAT)) err = file.Save(fname) if err != nil { panic(err) } }, } return cmdClient } func exportP() *cobra.Command { cmdClient := &cobra.Command{ Use: "export-project", Short: "Start export task", Run: func(cmd *cobra.Command, args []string) { InitMgo() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) file := xlsx.NewFile() sheet, err := file.AddSheet("sheet1") if err != nil { panic(err) } row := sheet.AddRow() for _, v := range FieldArr { row.AddCell().SetValue(v) } query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Pcoll).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(&tmp); count++ { if count%1000 == 0 { util.Debug("current ---", count) } func(tmp map[string]interface{}) { row := sheet.AddRow() taskExcelP(tmp, row) }(tmp) } util.Debug("over ---", count) fname := fmt.Sprintf("./数据导出-捏合%s.xlsx", util.NowFormat(util.DATEFORMAT)) err = file.Save(fname) if err != nil { panic(err) } }, } return cmdClient } func project() *cobra.Command { cmdClient := &cobra.Command{ Use: "project", Short: "Start project task", Run: func(cmd *cobra.Command, args []string) { InitMgo() go updateAllQueue() loadData() udpClient = udp.UdpClient{Local: config.Conf.Serve.LocPort, BufSize: 1024} udpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Serve.LocPort)) ch := make(chan bool, 1) <-ch }, } return cmdClient } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo)) if err != nil { _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) taskProject(mapInfo) } case udp.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Info("re", zap.String("ok:", ok)) } } } func updateAllQueue() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk(config.Conf.DB.Mongo.Pcoll, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk(config.Conf.DB.Mongo.Pcoll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }