main.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package main
  2. import (
  3. "data_project_information/config"
  4. "fmt"
  5. "github.com/spf13/cobra"
  6. "time"
  7. )
  8. func init() {
  9. config.Init("./common.toml")
  10. InitLog()
  11. updatePool = make(chan []map[string]interface{}, 5000)
  12. updateSp = make(chan bool, 5)
  13. saveSize = 200
  14. savePool = make(chan map[string]interface{}, 5000)
  15. saveSp = make(chan bool, 3)
  16. }
  17. func main() {
  18. go SaveFunc()
  19. rootCmd := &cobra.Command{Use: "my cmd"}
  20. rootCmd.AddCommand(project())
  21. rootCmd.AddCommand(tidb())
  22. if err := rootCmd.Execute(); err != nil {
  23. fmt.Println("rootCmd.Execute failed", err.Error())
  24. }
  25. c := make(chan bool, 1)
  26. <-c
  27. }
  28. func project() *cobra.Command {
  29. cmdClient := &cobra.Command{
  30. Use: "project",
  31. Short: "Start processing project data",
  32. Run: func(cmd *cobra.Command, args []string) {
  33. InitMgo()
  34. InitEs()
  35. go updateFuc()
  36. task()
  37. },
  38. }
  39. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  40. return cmdClient
  41. }
  42. func tidb() *cobra.Command {
  43. cmdClient := &cobra.Command{
  44. Use: "tidb",
  45. Short: "Start processing tidb data",
  46. Run: func(cmd *cobra.Command, args []string) {
  47. InitMgo()
  48. InitMysql()
  49. go SaveFunc()
  50. taskT()
  51. },
  52. }
  53. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  54. return cmdClient
  55. }
  56. func updateFuc() {
  57. arru := make([][]map[string]interface{}, 500)
  58. indexu := 0
  59. for {
  60. select {
  61. case v := <-updatePool:
  62. arru[indexu] = v
  63. indexu++
  64. if indexu == 500 {
  65. updateSp <- true
  66. go func(arru [][]map[string]interface{}) {
  67. defer func() {
  68. <-updateSp
  69. }()
  70. Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  71. }(arru)
  72. arru = make([][]map[string]interface{}, 500)
  73. indexu = 0
  74. }
  75. case <-time.After(1000 * time.Millisecond):
  76. if indexu > 0 {
  77. updateSp <- true
  78. go func(arru [][]map[string]interface{}) {
  79. defer func() {
  80. <-updateSp
  81. }()
  82. Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  83. }(arru[:indexu])
  84. arru = make([][]map[string]interface{}, 500)
  85. indexu = 0
  86. }
  87. }
  88. }
  89. }
  90. func SaveFunc() {
  91. arru := make([]map[string]interface{}, saveSize)
  92. indexu := 0
  93. for {
  94. select {
  95. case v := <-savePool:
  96. arru[indexu] = v
  97. indexu++
  98. if indexu == saveSize {
  99. saveSp <- true
  100. go func(arru []map[string]interface{}) {
  101. defer func() {
  102. <-saveSp
  103. }()
  104. MysqlTool.InsertBulk("property_project", BaseField, arru...)
  105. }(arru)
  106. arru = make([]map[string]interface{}, saveSize)
  107. indexu = 0
  108. }
  109. case <-time.After(1000 * time.Millisecond):
  110. if indexu > 0 {
  111. saveSp <- true
  112. go func(arru []map[string]interface{}) {
  113. defer func() {
  114. <-saveSp
  115. }()
  116. MysqlTool.InsertBulk("property_project", BaseField, arru...)
  117. }(arru[:indexu])
  118. arru = make([]map[string]interface{}, saveSize)
  119. indexu = 0
  120. }
  121. }
  122. }
  123. }