main.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package main
  2. import (
  3. "data_project_information/config"
  4. "fmt"
  5. "time"
  6. "github.com/robfig/cron/v3"
  7. "github.com/spf13/cobra"
  8. )
  9. func init() {
  10. config.Init("./common.toml")
  11. InitLog()
  12. updatePool = make(chan []map[string]interface{}, 5000)
  13. updateSp = make(chan bool, 5)
  14. saveSize = 200
  15. savePool = make(chan map[string]interface{}, 5000)
  16. saveSp = make(chan bool, 3)
  17. updateEsPool = make(chan []map[string]interface{}, 5000)
  18. updateEsSp = make(chan bool, 5)
  19. }
  20. func main() {
  21. //
  22. //InitMgo()
  23. //InitEs()
  24. //go updateFuc()
  25. //go updateEsMethod()
  26. //taskAdd()
  27. //return
  28. go SaveFunc()
  29. rootCmd := &cobra.Command{Use: "my cmd"}
  30. rootCmd.AddCommand(project())
  31. rootCmd.AddCommand(projectAdd())
  32. rootCmd.AddCommand(tidb())
  33. if err := rootCmd.Execute(); err != nil {
  34. fmt.Println("rootCmd.Execute failed", err.Error())
  35. }
  36. c := make(chan bool, 1)
  37. <-c
  38. }
  39. func project() *cobra.Command {
  40. cmdClient := &cobra.Command{
  41. Use: "project",
  42. Short: "Start processing project data",
  43. Run: func(cmd *cobra.Command, args []string) {
  44. InitMgo()
  45. InitEs()
  46. go updateFuc()
  47. task()
  48. },
  49. }
  50. return cmdClient
  51. }
  52. func projectAdd() *cobra.Command {
  53. cmdClient := &cobra.Command{
  54. Use: "project-add",
  55. Short: "Start processing project data",
  56. Run: func(cmd *cobra.Command, args []string) {
  57. InitMgo()
  58. InitEs()
  59. go updateFuc()
  60. go updateEsMethod()
  61. taskAdd()
  62. crn := cron.New()
  63. _, _ = crn.AddFunc("10 * * * *", func() {
  64. taskAdd()
  65. })
  66. crn.Start()
  67. ch := make(chan bool, 1)
  68. <-ch
  69. },
  70. }
  71. return cmdClient
  72. }
  73. func tidb() *cobra.Command {
  74. cmdClient := &cobra.Command{
  75. Use: "tidb",
  76. Short: "Start processing tidb data",
  77. Run: func(cmd *cobra.Command, args []string) {
  78. InitMgo()
  79. InitMysql()
  80. go SaveFunc()
  81. taskT()
  82. },
  83. }
  84. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  85. return cmdClient
  86. }
  87. func updateFuc() {
  88. arru := make([][]map[string]interface{}, 500)
  89. indexu := 0
  90. for {
  91. select {
  92. case v := <-updatePool:
  93. arru[indexu] = v
  94. indexu++
  95. if indexu == 500 {
  96. updateSp <- true
  97. go func(arru [][]map[string]interface{}) {
  98. defer func() {
  99. <-updateSp
  100. }()
  101. Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  102. }(arru)
  103. arru = make([][]map[string]interface{}, 500)
  104. indexu = 0
  105. }
  106. case <-time.After(1000 * time.Millisecond):
  107. if indexu > 0 {
  108. updateSp <- true
  109. go func(arru [][]map[string]interface{}) {
  110. defer func() {
  111. <-updateSp
  112. }()
  113. Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  114. }(arru[:indexu])
  115. arru = make([][]map[string]interface{}, 500)
  116. indexu = 0
  117. }
  118. }
  119. }
  120. }
  121. func SaveFunc() {
  122. arru := make([]map[string]interface{}, saveSize)
  123. indexu := 0
  124. for {
  125. select {
  126. case v := <-savePool:
  127. arru[indexu] = v
  128. indexu++
  129. if indexu == saveSize {
  130. saveSp <- true
  131. go func(arru []map[string]interface{}) {
  132. defer func() {
  133. <-saveSp
  134. }()
  135. MysqlTool.InsertBulk("property_project", BaseField, arru...)
  136. }(arru)
  137. arru = make([]map[string]interface{}, saveSize)
  138. indexu = 0
  139. }
  140. case <-time.After(1000 * time.Millisecond):
  141. if indexu > 0 {
  142. saveSp <- true
  143. go func(arru []map[string]interface{}) {
  144. defer func() {
  145. <-saveSp
  146. }()
  147. MysqlTool.InsertBulk("property_project", BaseField, arru...)
  148. }(arru[:indexu])
  149. arru = make([]map[string]interface{}, saveSize)
  150. indexu = 0
  151. }
  152. }
  153. }
  154. }
  155. // 批量修改es
  156. func updateEsMethod() {
  157. arru := make([][]map[string]interface{}, 200)
  158. indexu := 0
  159. for {
  160. select {
  161. case v := <-updateEsPool:
  162. arru[indexu] = v
  163. indexu++
  164. if indexu == 200 {
  165. updateEsSp <- true
  166. go func(arru [][]map[string]interface{}) {
  167. defer func() {
  168. <-updateEsSp
  169. }()
  170. Es1.UpdateBulk("projectset", arru...)
  171. }(arru)
  172. arru = make([][]map[string]interface{}, 200)
  173. indexu = 0
  174. }
  175. case <-time.After(1000 * time.Millisecond):
  176. if indexu > 0 {
  177. updateEsSp <- true
  178. go func(arru [][]map[string]interface{}) {
  179. defer func() {
  180. <-updateEsSp
  181. }()
  182. Es1.UpdateBulk("projectset", arru...)
  183. }(arru[:indexu])
  184. arru = make([][]map[string]interface{}, 200)
  185. indexu = 0
  186. }
  187. }
  188. }
  189. }