main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package main
  2. import (
  3. "fmt"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. Mgo *mongodb.MongodbSim
  13. Es *elastic.Elastic
  14. EsNew *elastic.Elastic
  15. Es3 *elastic.Elastic
  16. MatchArr = make([]TagMatching, 0) // 存放标签规则
  17. globalRegs = make([]TagMatching, 0) //关键词规则,是标签规则大前提
  18. //更新es
  19. updateEsPool = make(chan []map[string]interface{}, 5000)
  20. updateEsSp = make(chan bool, 5) //保存协程
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. )
  25. func main() {
  26. go updateEsMethod() // 更新es
  27. go updateMethod()
  28. Init()
  29. //oss.InitOss()
  30. InitRule()
  31. taskRun()
  32. log.Println("over")
  33. c := make(chan bool, 1)
  34. <-c
  35. }
  36. // taskRun 执行
  37. func taskRun() {
  38. defer util.Catch()
  39. sess := Mgo.GetMgoConn()
  40. defer Mgo.DestoryMongoConn(sess)
  41. ch := make(chan bool, 15)
  42. wg := &sync.WaitGroup{}
  43. //查询条件
  44. q := map[string]interface{}{
  45. "_id": map[string]interface{}{
  46. "$gt": mongodb.StringTOBsonId("65b7ebfc66cf0db42a996a65"),
  47. "$lte": mongodb.StringTOBsonId("6612323266cf0db42a75789f"),
  48. //"$lte": mongodb.StringTOBsonId("5f00e67e52c1d9fbf8367996"), //测试环境
  49. },
  50. }
  51. count := 0
  52. selected := map[string]interface{}{"title": 1, "detail": 1, "projectname": 1, "purchasing": 1, "buyer": 1, "attach_text": 1}
  53. it := sess.DB("qfw").C("bidding").Find(q).Select(selected).Iter()
  54. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  55. if count%5000 == 0 {
  56. log.Println("current:", count, tmp["_id"])
  57. }
  58. ch <- true
  59. wg.Add(1)
  60. go func(tmp map[string]interface{}) {
  61. defer func() {
  62. <-ch
  63. wg.Done()
  64. }()
  65. // 存在附件时
  66. if atta, ok := tmp["attach_text"]; ok && atta != nil {
  67. id := mongodb.BsonIdToSId(tmp["_id"])
  68. if id == "" {
  69. return
  70. }
  71. err, doc := Es.GetById("bidding", id)
  72. if err != nil {
  73. return
  74. } else {
  75. tmp["filetext"] = doc["filetext"]
  76. }
  77. }
  78. gs, _, _ := TaskTags(tmp, globalRegs)
  79. if len(gs) > 0 {
  80. tags, match, add := TaskTags(tmp, MatchArr)
  81. if len(tags) > 0 {
  82. update := map[string]interface{}{
  83. "mobile_tag": tags,
  84. }
  85. log.Println("id--", mongodb.BsonIdToSId(tmp["_id"]), match, add)
  86. //更新MongoDB
  87. updatePool <- []map[string]interface{}{
  88. {"_id": tmp["_id"]},
  89. {"$set": update},
  90. }
  91. //Mgo.UpdateById("bidding", mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
  92. //更新es
  93. updateEsPool <- []map[string]interface{}{
  94. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  95. update,
  96. }
  97. }
  98. }
  99. }(tmp)
  100. tmp = make(map[string]interface{})
  101. }
  102. wg.Wait()
  103. fmt.Println("over ---> ", count)
  104. }
  105. // updateMethod 更新MongoDB
  106. func updateMethod() {
  107. arru := make([][]map[string]interface{}, 50)
  108. indexu := 0
  109. for {
  110. select {
  111. case v := <-updatePool:
  112. arru[indexu] = v
  113. indexu++
  114. if indexu == 50 {
  115. updateSp <- true
  116. go func(arru [][]map[string]interface{}) {
  117. defer func() {
  118. <-updateSp
  119. }()
  120. Mgo.UpdateBulk("bidding", arru...)
  121. }(arru)
  122. arru = make([][]map[string]interface{}, 50)
  123. indexu = 0
  124. }
  125. case <-time.After(1000 * time.Millisecond):
  126. if indexu > 0 {
  127. updateSp <- true
  128. go func(arru [][]map[string]interface{}) {
  129. defer func() {
  130. <-updateSp
  131. }()
  132. Mgo.UpdateBulk("bidding", arru...)
  133. }(arru[:indexu])
  134. arru = make([][]map[string]interface{}, 50)
  135. indexu = 0
  136. }
  137. }
  138. }
  139. }
  140. // updateEsMethod 更新es
  141. func updateEsMethod() {
  142. arru := make([][]map[string]interface{}, 200)
  143. indexu := 0
  144. for {
  145. select {
  146. case v := <-updateEsPool:
  147. arru[indexu] = v
  148. indexu++
  149. if indexu == 200 {
  150. updateEsSp <- true
  151. go func(arru [][]map[string]interface{}) {
  152. defer func() {
  153. <-updateEsSp
  154. }()
  155. Es.UpdateBulk("bidding", arru...)
  156. EsNew.UpdateBulk("bidding", arru...)
  157. }(arru)
  158. arru = make([][]map[string]interface{}, 200)
  159. indexu = 0
  160. }
  161. case <-time.After(1000 * time.Millisecond):
  162. if indexu > 0 {
  163. updateEsSp <- true
  164. go func(arru [][]map[string]interface{}) {
  165. defer func() {
  166. <-updateEsSp
  167. }()
  168. Es.UpdateBulk("bidding", arru...)
  169. EsNew.UpdateBulk("bidding", arru...)
  170. }(arru[:indexu])
  171. arru = make([][]map[string]interface{}, 200)
  172. indexu = 0
  173. }
  174. }
  175. }
  176. }