main.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package main
  2. import (
  3. "data_project_information/config"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "github.com/spf13/cobra"
  7. "time"
  8. )
  9. func init() {
  10. config.Init("./common.toml")
  11. InitLog()
  12. updatePool = make(chan []map[string]interface{}, 5000)
  13. updateSp = make(chan bool, 5)
  14. saveSize = 200
  15. savePool = make(chan map[string]interface{}, 5000)
  16. saveSp = make(chan bool, 3)
  17. }
  18. func main() {
  19. go SaveFunc()
  20. rootCmd := &cobra.Command{Use: "my cmd"}
  21. rootCmd.AddCommand(project())
  22. rootCmd.AddCommand(projectAdd())
  23. rootCmd.AddCommand(tidb())
  24. if err := rootCmd.Execute(); err != nil {
  25. fmt.Println("rootCmd.Execute failed", err.Error())
  26. }
  27. c := make(chan bool, 1)
  28. <-c
  29. }
  30. func project() *cobra.Command {
  31. cmdClient := &cobra.Command{
  32. Use: "project",
  33. Short: "Start processing project data",
  34. Run: func(cmd *cobra.Command, args []string) {
  35. InitMgo()
  36. InitEs()
  37. go updateFuc()
  38. task()
  39. },
  40. }
  41. return cmdClient
  42. }
  43. func projectAdd() *cobra.Command {
  44. cmdClient := &cobra.Command{
  45. Use: "project-add",
  46. Short: "Start processing project data",
  47. Run: func(cmd *cobra.Command, args []string) {
  48. InitMgo()
  49. InitEs()
  50. go updateFuc()
  51. taskAdd()
  52. crn := cron.New()
  53. _, _ = crn.AddFunc("10 * * * *", func() {
  54. taskAdd()
  55. })
  56. crn.Start()
  57. ch := make(chan bool, 1)
  58. <-ch
  59. },
  60. }
  61. return cmdClient
  62. }
  63. func tidb() *cobra.Command {
  64. cmdClient := &cobra.Command{
  65. Use: "tidb",
  66. Short: "Start processing tidb data",
  67. Run: func(cmd *cobra.Command, args []string) {
  68. InitMgo()
  69. InitMysql()
  70. go SaveFunc()
  71. taskT()
  72. },
  73. }
  74. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  75. return cmdClient
  76. }
  77. func updateFuc() {
  78. arru := make([][]map[string]interface{}, 500)
  79. indexu := 0
  80. for {
  81. select {
  82. case v := <-updatePool:
  83. arru[indexu] = v
  84. indexu++
  85. if indexu == 500 {
  86. updateSp <- true
  87. go func(arru [][]map[string]interface{}) {
  88. defer func() {
  89. <-updateSp
  90. }()
  91. Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  92. }(arru)
  93. arru = make([][]map[string]interface{}, 500)
  94. indexu = 0
  95. }
  96. case <-time.After(1000 * time.Millisecond):
  97. if indexu > 0 {
  98. updateSp <- true
  99. go func(arru [][]map[string]interface{}) {
  100. defer func() {
  101. <-updateSp
  102. }()
  103. Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  104. }(arru[:indexu])
  105. arru = make([][]map[string]interface{}, 500)
  106. indexu = 0
  107. }
  108. }
  109. }
  110. }
  111. func SaveFunc() {
  112. arru := make([]map[string]interface{}, saveSize)
  113. indexu := 0
  114. for {
  115. select {
  116. case v := <-savePool:
  117. arru[indexu] = v
  118. indexu++
  119. if indexu == saveSize {
  120. saveSp <- true
  121. go func(arru []map[string]interface{}) {
  122. defer func() {
  123. <-saveSp
  124. }()
  125. MysqlTool.InsertBulk("property_project", BaseField, arru...)
  126. }(arru)
  127. arru = make([]map[string]interface{}, saveSize)
  128. indexu = 0
  129. }
  130. case <-time.After(1000 * time.Millisecond):
  131. if indexu > 0 {
  132. saveSp <- true
  133. go func(arru []map[string]interface{}) {
  134. defer func() {
  135. <-saveSp
  136. }()
  137. MysqlTool.InsertBulk("property_project", BaseField, arru...)
  138. }(arru[:indexu])
  139. arru = make([]map[string]interface{}, saveSize)
  140. indexu = 0
  141. }
  142. }
  143. }
  144. }