main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package main
  2. import (
  3. "fmt"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. Mgo *mongodb.MongodbSim
  13. Es *elastic.Elastic
  14. EsNew *elastic.Elastic
  15. Es3 *elastic.Elastic
  16. MatchArr = make([]TagMatching, 0) // 存放标签规则
  17. globalRegs = make([]TagMatching, 0) //关键词规则,是标签规则大前提
  18. //更新es
  19. updateEsPool = make(chan []map[string]interface{}, 5000)
  20. updateEsSp = make(chan bool, 5) //保存协程
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. )
  25. func main() {
  26. go updateEsMethod() // 更新es
  27. go updateMethod()
  28. Init()
  29. //oss.InitOss()
  30. InitRule()
  31. taskRun()
  32. log.Println("over")
  33. c := make(chan bool, 1)
  34. <-c
  35. }
  36. // taskRun 执行
  37. func taskRun() {
  38. defer util.Catch()
  39. sess := Mgo.GetMgoConn()
  40. defer Mgo.DestoryMongoConn(sess)
  41. ch := make(chan bool, 10)
  42. wg := &sync.WaitGroup{}
  43. //查询条件
  44. q := map[string]interface{}{
  45. "_id": map[string]interface{}{
  46. //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  47. //"$lte": mongodb.StringTOBsonId("6612323266cf0db42a75789f"),
  48. "$lte": mongodb.StringTOBsonId("5f00e67e52c1d9fbf8367996"), //测试环境
  49. },
  50. }
  51. count := 0
  52. selected := map[string]interface{}{"title": 1, "detail": 1, "projectname": 1, "purchasing": 1, "buyer": 1, "attach_text": 1}
  53. it := sess.DB("qfw_data").C("bidding").Find(q).Select(selected).Sort("-_id").Iter()
  54. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  55. if count%5000 == 0 {
  56. log.Println("current:", count, tmp["_id"])
  57. }
  58. ch <- true
  59. wg.Add(1)
  60. go func(tmp map[string]interface{}) {
  61. defer func() {
  62. <-ch
  63. wg.Done()
  64. }()
  65. // 存在附件时
  66. //if atta, ok := tmp["attach_text"]; ok && atta != nil {
  67. // text := getFileText(tmp) // 附件信息
  68. // if text != "" {
  69. // tmp["filetext"] = text
  70. // }
  71. //}
  72. gs, _, _ := TaskTags(tmp, globalRegs)
  73. if len(gs) > 0 {
  74. tags, match, add := TaskTags(tmp, MatchArr)
  75. if len(tags) > 0 {
  76. update := map[string]interface{}{
  77. "mobile_tag": tags,
  78. }
  79. log.Println("id--", mongodb.BsonIdToSId(tmp["_id"]), match, add)
  80. //更新MongoDB
  81. updatePool <- []map[string]interface{}{
  82. {"_id": tmp["_id"]},
  83. {"$set": update},
  84. }
  85. //Mgo.UpdateById("bidding", mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
  86. //更新es
  87. updateEsPool <- []map[string]interface{}{
  88. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  89. update,
  90. }
  91. }
  92. }
  93. }(tmp)
  94. tmp = make(map[string]interface{})
  95. }
  96. wg.Wait()
  97. fmt.Println("over ---> ", count)
  98. }
  99. // updateMethod 更新MongoDB
  100. func updateMethod() {
  101. arru := make([][]map[string]interface{}, 50)
  102. indexu := 0
  103. for {
  104. select {
  105. case v := <-updatePool:
  106. arru[indexu] = v
  107. indexu++
  108. if indexu == 50 {
  109. updateSp <- true
  110. go func(arru [][]map[string]interface{}) {
  111. defer func() {
  112. <-updateSp
  113. }()
  114. Mgo.UpdateBulk("bidding", arru...)
  115. }(arru)
  116. arru = make([][]map[string]interface{}, 50)
  117. indexu = 0
  118. }
  119. case <-time.After(1000 * time.Millisecond):
  120. if indexu > 0 {
  121. updateSp <- true
  122. go func(arru [][]map[string]interface{}) {
  123. defer func() {
  124. <-updateSp
  125. }()
  126. Mgo.UpdateBulk("bidding", arru...)
  127. }(arru[:indexu])
  128. arru = make([][]map[string]interface{}, 50)
  129. indexu = 0
  130. }
  131. }
  132. }
  133. }
  134. // updateEsMethod 更新es
  135. func updateEsMethod() {
  136. arru := make([][]map[string]interface{}, 200)
  137. indexu := 0
  138. for {
  139. select {
  140. case v := <-updateEsPool:
  141. arru[indexu] = v
  142. indexu++
  143. if indexu == 200 {
  144. updateEsSp <- true
  145. go func(arru [][]map[string]interface{}) {
  146. defer func() {
  147. <-updateEsSp
  148. }()
  149. Es.UpdateBulk("bidding", arru...)
  150. //EsNew.UpdateBulk("bidding", arru...)
  151. //Es3.UpdateBulk("bidding", arru...)
  152. }(arru)
  153. arru = make([][]map[string]interface{}, 200)
  154. indexu = 0
  155. }
  156. case <-time.After(1000 * time.Millisecond):
  157. if indexu > 0 {
  158. updateEsSp <- true
  159. go func(arru [][]map[string]interface{}) {
  160. defer func() {
  161. <-updateEsSp
  162. }()
  163. Es.UpdateBulk("bidding", arru...)
  164. //Es3.UpdateBulk("bidding", arru...)
  165. //EsNew.UpdateBulk("bidding", arru...)
  166. }(arru[:indexu])
  167. arru = make([][]map[string]interface{}, 200)
  168. indexu = 0
  169. }
  170. }
  171. }
  172. }