main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package main
  2. import (
  3. "mongodb"
  4. "qfw/util"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. var (
  10. Sysconfig map[string]interface{}
  11. Mgo, Mgo1 *mongodb.MongodbSim
  12. Dbname, Dbcoll string
  13. collSave, qyxyColl string
  14. savePool chan map[string]interface{}
  15. saveSp chan bool
  16. updatePool chan []map[string]interface{}
  17. updateSp chan bool
  18. filePath string
  19. tagArr []string
  20. operators []string
  21. TagMatchRule = map[string][]TagMatching{}
  22. )
  23. func init() {
  24. util.ReadConfig(&Sysconfig)
  25. collSave = util.ObjToString(Sysconfig["dbcoll"])
  26. qyxyColl = util.ObjToString(Sysconfig["qyxyColl"])
  27. Mgo = &mongodb.MongodbSim{
  28. MongodbAddr: Sysconfig["mgodb"].(string),
  29. Size: util.IntAllDef(Sysconfig["dbsize"], 5),
  30. DbName: Sysconfig["dbname"].(string),
  31. //UserName: Sysconfig["uname"].(string),
  32. //Password: Sysconfig["upwd"].(string),
  33. }
  34. Mgo.InitPool()
  35. checkDb, _ := Sysconfig["checkDb"].(map[string]interface{})
  36. Dbname = checkDb["dbname"].(string)
  37. Dbcoll = checkDb["dbcoll"].(string)
  38. Mgo1 = &mongodb.MongodbSim{
  39. MongodbAddr: checkDb["addr"].(string),
  40. Size: util.IntAll(checkDb["dbsize"]),
  41. DbName: checkDb["dbname"].(string),
  42. }
  43. Mgo1.InitPool()
  44. filePath = util.ObjToString(Sysconfig["tagFile"])
  45. savePool = make(chan map[string]interface{}, 5000)
  46. saveSp = make(chan bool, 5)
  47. updatePool = make(chan []map[string]interface{}, 5000)
  48. updateSp = make(chan bool, 5)
  49. operators = strings.Split(util.ObjToString(Sysconfig["operators"]), ",")
  50. initExcel(filePath)
  51. util.Debug(len(TagMatchRule))
  52. }
  53. func main() {
  54. //go saveMethod()
  55. go updateMethod()
  56. sess := Mgo1.GetMgoConn()
  57. defer Mgo1.DestoryMongoConn(sess)
  58. ch := make(chan bool, 3)
  59. wg := &sync.WaitGroup{}
  60. //q := map[string]interface{}{"id": "61547b9f1a75b8f4469b7f90"}
  61. query := sess.DB(Dbname).C(Dbcoll).Find(nil).Select(nil).Iter()
  62. count := 0
  63. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  64. if count%500 == 0 {
  65. util.Debug("current ---", count)
  66. }
  67. ch <- true
  68. wg.Add(1)
  69. go func(tmp map[string]interface{}) {
  70. defer func() {
  71. <-ch
  72. wg.Done()
  73. }()
  74. taskinfo(tmp)
  75. }(tmp)
  76. tmp = make(map[string]interface{})
  77. }
  78. wg.Wait()
  79. util.Debug("over ---", count)
  80. c := make(chan bool, 1)
  81. <-c
  82. }
  83. func saveMethod() {
  84. arru := make([]map[string]interface{}, 200)
  85. indexu := 0
  86. for {
  87. select {
  88. case v := <-savePool:
  89. arru[indexu] = v
  90. indexu++
  91. if indexu == 200 {
  92. saveSp <- true
  93. go func(arru []map[string]interface{}) {
  94. defer func() {
  95. <-saveSp
  96. }()
  97. Mgo.SaveBulk(collSave, arru...)
  98. }(arru)
  99. arru = make([]map[string]interface{}, 200)
  100. indexu = 0
  101. }
  102. case <-time.After(1000 * time.Millisecond):
  103. if indexu > 0 {
  104. saveSp <- true
  105. go func(arru []map[string]interface{}) {
  106. defer func() {
  107. <-saveSp
  108. }()
  109. Mgo.SaveBulk(collSave, arru...)
  110. }(arru[:indexu])
  111. arru = make([]map[string]interface{}, 200)
  112. indexu = 0
  113. }
  114. }
  115. }
  116. }
  117. func updateMethod() {
  118. arru := make([][]map[string]interface{}, 200)
  119. indexu := 0
  120. for {
  121. select {
  122. case v := <-updatePool:
  123. arru[indexu] = v
  124. indexu++
  125. if indexu == 200 {
  126. updateSp <- true
  127. go func(arru [][]map[string]interface{}) {
  128. defer func() {
  129. <-updateSp
  130. }()
  131. Mgo.UpSertBulk(collSave, arru...)
  132. }(arru)
  133. arru = make([][]map[string]interface{}, 200)
  134. indexu = 0
  135. }
  136. case <-time.After(1000 * time.Millisecond):
  137. if indexu > 0 {
  138. updateSp <- true
  139. go func(arru [][]map[string]interface{}) {
  140. defer func() {
  141. <-updateSp
  142. }()
  143. Mgo.UpSertBulk(collSave, arru...)
  144. }(arru[:indexu])
  145. arru = make([][]map[string]interface{}, 200)
  146. indexu = 0
  147. }
  148. }
  149. }
  150. }