main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package main
  2. import (
  3. "log"
  4. "mongodb"
  5. _ "net/http/pprof"
  6. "qfw/util"
  7. "strings"
  8. "time"
  9. )
  10. var (
  11. Sysconfig map[string]interface{} //配置文件
  12. mgo *mongodb.MongodbSim //mongodb操作对象
  13. biddingIndexFields []string
  14. biddingIndexFieldsMap = map[string]string{}
  15. projectinfoFields []string
  16. projectinfoFieldsMap = map[string]string{}
  17. multiIndex []string
  18. purchasinglistFields []string
  19. winnerorderlistFields []string
  20. purchasinglistFieldsMap = map[string]string{}
  21. winnerorderlistFieldsMap = map[string]string{}
  22. BulkSize = 400
  23. detailLength = 50000
  24. fileLength = 50000
  25. savesize = 500
  26. Es *Elastic
  27. EsBulkSize int
  28. saveProjectEsPool chan map[string]interface{}
  29. saveProjectSp chan bool
  30. bidding, project map[string]interface{}
  31. )
  32. var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
  33. var SP = make(chan bool, 5)
  34. func init() {
  35. util.ReadConfig(&Sysconfig)
  36. detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
  37. fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
  38. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  39. project, _ = Sysconfig["project"].(map[string]interface{})
  40. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  41. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  42. MongodbAddr: mconf["addr"].(string),
  43. Size: util.IntAllDef(mconf["pool"], 5),
  44. DbName: mconf["db"].(string),
  45. UserName: Sysconfig["uname"].(string),
  46. Password: Sysconfig["upwd"].(string),
  47. }
  48. mgo.InitPool()
  49. //初始化es
  50. econf := Sysconfig["elastic"].(map[string]interface{})
  51. Es = &Elastic{
  52. S_esurl: util.ObjToString(econf["addr"]), //http://172.17.4.184:19800
  53. I_size: util.IntAll(econf["size"]),
  54. Username: util.ObjToString(econf["user"]),
  55. Password: util.ObjToString(econf["password"]),
  56. }
  57. Es.InitElasticSize()
  58. //
  59. if bidding["indexfields"] != nil {
  60. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  61. }
  62. if bidding["projectinfo"] != nil {
  63. pf := util.ObjToString(bidding["projectinfo"])
  64. if pf != "" {
  65. projectinfoFields = strings.Split(pf, ",")
  66. }
  67. }
  68. if bidding["purchasinglist"] != nil {
  69. pcl := util.ObjToString(bidding["purchasinglist"])
  70. if pcl != "" {
  71. purchasinglistFields = strings.Split(pcl, ",")
  72. }
  73. }
  74. if bidding["winnerorder"] != nil {
  75. winnerorder := util.ObjToString(bidding["winnerorder"])
  76. if winnerorder != "" {
  77. winnerorderlistFields = strings.Split(winnerorder, ",")
  78. }
  79. }
  80. //
  81. if bidding["indexfieldsmap"] != nil {
  82. for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
  83. biddingIndexFieldsMap[k] = util.ObjToString(v)
  84. }
  85. log.Println(biddingIndexFieldsMap)
  86. }
  87. if bidding["projectinfomap"] != nil {
  88. for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
  89. projectinfoFieldsMap[k] = util.ObjToString(v)
  90. }
  91. log.Println(projectinfoFieldsMap)
  92. }
  93. if bidding["purchasinglistmap"] != nil {
  94. for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
  95. purchasinglistFieldsMap[k] = util.ObjToString(v)
  96. }
  97. log.Println(purchasinglistFieldsMap)
  98. }
  99. if bidding["winnerordermap"] != nil {
  100. for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
  101. winnerorderlistFieldsMap[k] = util.ObjToString(v)
  102. }
  103. log.Println(winnerorderlistFieldsMap)
  104. }
  105. log.Println(projectinfoFields)
  106. log.Println(purchasinglistFields)
  107. EsBulkSize = 200
  108. saveProjectEsPool = make(chan map[string]interface{}, 5000)
  109. saveProjectSp = make(chan bool, 5)
  110. //初始化oss
  111. InitOss()
  112. }
  113. func main() {
  114. go SaveProjectEs()
  115. //biddingTask()
  116. projectTask()
  117. ch := make(chan bool, 1)
  118. <-ch
  119. }
  120. func SaveProjectEs() {
  121. arru := make([]map[string]interface{}, EsBulkSize)
  122. indexu := 0
  123. for {
  124. select {
  125. case v := <-saveProjectEsPool:
  126. arru[indexu] = v
  127. indexu++
  128. if indexu == EsBulkSize {
  129. saveProjectSp <- true
  130. go func(arru []map[string]interface{}) {
  131. defer func() {
  132. <-saveProjectSp
  133. }()
  134. Es.BulkSave("oprd_projectset_v1", arru)
  135. }(arru)
  136. arru = make([]map[string]interface{}, EsBulkSize)
  137. indexu = 0
  138. }
  139. case <-time.After(1000 * time.Millisecond):
  140. if indexu > 0 {
  141. saveProjectSp <- true
  142. go func(arru []map[string]interface{}) {
  143. defer func() {
  144. <-saveProjectSp
  145. }()
  146. Es.BulkSave("oprd_projectset_v1", arru)
  147. }(arru[:indexu])
  148. arru = make([]map[string]interface{}, EsBulkSize)
  149. indexu = 0
  150. }
  151. }
  152. }
  153. }