main.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/spf13/cobra"
  6. "github.com/tealeg/xlsx"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "net"
  12. "tieta_data/config"
  13. "time"
  14. )
  15. var (
  16. udpClient udp.UdpClient
  17. updatePool chan []map[string]interface{}
  18. updateSp chan bool
  19. saveSize int
  20. )
  21. func init() {
  22. config.Init("./common.toml")
  23. InitLog()
  24. updatePool = make(chan []map[string]interface{}, 5000)
  25. updateSp = make(chan bool, 1)
  26. saveSize = 200
  27. }
  28. func main() {
  29. rootCmd := &cobra.Command{Use: "my cmd"}
  30. rootCmd.AddCommand(exportB())
  31. rootCmd.AddCommand(exportP())
  32. rootCmd.AddCommand(project())
  33. if err := rootCmd.Execute(); err != nil {
  34. fmt.Println("rootCmd.Execute failed", err.Error())
  35. }
  36. }
  37. func exportB() *cobra.Command {
  38. cmdClient := &cobra.Command{
  39. Use: "export-bidding",
  40. Short: "Start export task",
  41. Run: func(cmd *cobra.Command, args []string) {
  42. InitMgo()
  43. sess := MongoTool.GetMgoConn()
  44. defer MongoTool.DestoryMongoConn(sess)
  45. file := xlsx.NewFile()
  46. sheet, err := file.AddSheet("sheet1")
  47. if err != nil {
  48. panic(err)
  49. }
  50. row := sheet.AddRow()
  51. for _, v := range FieldArr1 {
  52. row.AddCell().SetValue(v)
  53. }
  54. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(nil).Select(nil).Iter()
  55. count := 0
  56. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  57. if count%1000 == 0 {
  58. util.Debug("current ---", count)
  59. }
  60. func(tmp map[string]interface{}) {
  61. row := sheet.AddRow()
  62. taskExcelB(tmp, row)
  63. }(tmp)
  64. }
  65. util.Debug("over ---", count)
  66. fname := fmt.Sprintf("./数据导出-标文%s.xlsx", util.NowFormat(util.DATEFORMAT))
  67. err = file.Save(fname)
  68. if err != nil {
  69. panic(err)
  70. }
  71. },
  72. }
  73. return cmdClient
  74. }
  75. func exportP() *cobra.Command {
  76. cmdClient := &cobra.Command{
  77. Use: "export-project",
  78. Short: "Start export task",
  79. Run: func(cmd *cobra.Command, args []string) {
  80. InitMgo()
  81. sess := MongoTool.GetMgoConn()
  82. defer MongoTool.DestoryMongoConn(sess)
  83. file := xlsx.NewFile()
  84. sheet, err := file.AddSheet("sheet1")
  85. if err != nil {
  86. panic(err)
  87. }
  88. row := sheet.AddRow()
  89. for _, v := range FieldArr {
  90. row.AddCell().SetValue(v)
  91. }
  92. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Pcoll).Find(nil).Select(nil).Iter()
  93. count := 0
  94. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  95. if count%1000 == 0 {
  96. util.Debug("current ---", count)
  97. }
  98. func(tmp map[string]interface{}) {
  99. row := sheet.AddRow()
  100. taskExcelP(tmp, row)
  101. }(tmp)
  102. }
  103. util.Debug("over ---", count)
  104. fname := fmt.Sprintf("./数据导出-捏合%s.xlsx", util.NowFormat(util.DATEFORMAT))
  105. err = file.Save(fname)
  106. if err != nil {
  107. panic(err)
  108. }
  109. },
  110. }
  111. return cmdClient
  112. }
  113. func project() *cobra.Command {
  114. cmdClient := &cobra.Command{
  115. Use: "project",
  116. Short: "Start project task",
  117. Run: func(cmd *cobra.Command, args []string) {
  118. InitMgo()
  119. go updateAllQueue()
  120. loadData()
  121. udpClient = udp.UdpClient{Local: config.Conf.Serve.LocPort, BufSize: 1024}
  122. udpClient.Listen(processUdpMsg)
  123. log.Info("Udp服务监听", zap.String("port:", config.Conf.Serve.LocPort))
  124. ch := make(chan bool, 1)
  125. <-ch
  126. },
  127. }
  128. return cmdClient
  129. }
  130. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  131. switch act {
  132. case udp.OP_TYPE_DATA: //上个节点的数据
  133. var mapInfo map[string]interface{}
  134. err := json.Unmarshal(data, &mapInfo)
  135. log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo))
  136. if err != nil {
  137. _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  138. } else if mapInfo != nil {
  139. key, _ := mapInfo["key"].(string)
  140. if key == "" {
  141. key = "udpok"
  142. }
  143. go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  144. taskProject(mapInfo)
  145. }
  146. case udp.OP_NOOP: //下个节点回应
  147. ok := string(data)
  148. if ok != "" {
  149. log.Info("re", zap.String("ok:", ok))
  150. }
  151. }
  152. }
  153. func updateAllQueue() {
  154. arru := make([][]map[string]interface{}, saveSize)
  155. indexu := 0
  156. for {
  157. select {
  158. case v := <-updatePool:
  159. arru[indexu] = v
  160. indexu++
  161. if indexu == saveSize {
  162. updateSp <- true
  163. go func(arru [][]map[string]interface{}) {
  164. defer func() {
  165. <-updateSp
  166. }()
  167. MongoTool.UpSertBulk(config.Conf.DB.Mongo.Pcoll, arru...)
  168. }(arru)
  169. arru = make([][]map[string]interface{}, saveSize)
  170. indexu = 0
  171. }
  172. case <-time.After(1 * time.Second):
  173. if indexu > 0 {
  174. updateSp <- true
  175. go func(arru [][]map[string]interface{}) {
  176. defer func() {
  177. <-updateSp
  178. }()
  179. MongoTool.UpSertBulk(config.Conf.DB.Mongo.Pcoll, arru...)
  180. }(arru[:indexu])
  181. arru = make([][]map[string]interface{}, saveSize)
  182. indexu = 0
  183. }
  184. }
  185. }
  186. }