123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/spf13/cobra"
- "github.com/tealeg/xlsx"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "tieta_data/config"
- "time"
- )
- var (
- udpClient udp.UdpClient
- updatePool chan []map[string]interface{}
- updateSp chan bool
- saveSize int
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 1)
- saveSize = 200
- }
- func main() {
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(exportB())
- rootCmd.AddCommand(exportP())
- rootCmd.AddCommand(project())
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- }
- func exportB() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "export-bidding",
- Short: "Start export task",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- file := xlsx.NewFile()
- sheet, err := file.AddSheet("sheet1")
- if err != nil {
- panic(err)
- }
- row := sheet.AddRow()
- for _, v := range FieldArr1 {
- row.AddCell().SetValue(v)
- }
- query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
- if count%1000 == 0 {
- util.Debug("current ---", count)
- }
- func(tmp map[string]interface{}) {
- row := sheet.AddRow()
- taskExcelB(tmp, row)
- }(tmp)
- }
- util.Debug("over ---", count)
- fname := fmt.Sprintf("./数据导出-标文%s.xlsx", util.NowFormat(util.DATEFORMAT))
- err = file.Save(fname)
- if err != nil {
- panic(err)
- }
- },
- }
- return cmdClient
- }
- func exportP() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "export-project",
- Short: "Start export task",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- file := xlsx.NewFile()
- sheet, err := file.AddSheet("sheet1")
- if err != nil {
- panic(err)
- }
- row := sheet.AddRow()
- for _, v := range FieldArr {
- row.AddCell().SetValue(v)
- }
- query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Pcoll).Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
- if count%1000 == 0 {
- util.Debug("current ---", count)
- }
- func(tmp map[string]interface{}) {
- row := sheet.AddRow()
- taskExcelP(tmp, row)
- }(tmp)
- }
- util.Debug("over ---", count)
- fname := fmt.Sprintf("./数据导出-捏合%s.xlsx", util.NowFormat(util.DATEFORMAT))
- err = file.Save(fname)
- if err != nil {
- panic(err)
- }
- },
- }
- return cmdClient
- }
- func project() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "project",
- Short: "Start project task",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- go updateAllQueue()
- loadData()
- udpClient = udp.UdpClient{Local: config.Conf.Serve.LocPort, BufSize: 1024}
- udpClient.Listen(processUdpMsg)
- log.Info("Udp服务监听", zap.String("port:", config.Conf.Serve.LocPort))
- ch := make(chan bool, 1)
- <-ch
- },
- }
- return cmdClient
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo))
- if err != nil {
- _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- taskProject(mapInfo)
- }
- case udp.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Info("re", zap.String("ok:", ok))
- }
- }
- }
- func updateAllQueue() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk(config.Conf.DB.Mongo.Pcoll, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1 * time.Second):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk(config.Conf.DB.Mongo.Pcoll, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|