main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package main
  2. import (
  3. "fmt"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "mobile_tag/oss"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. Mgo *mongodb.MongodbSim
  14. Es *elastic.Elastic
  15. EsNew *elastic.Elastic
  16. Es3 *elastic.Elastic
  17. MatchArr = make([]TagMatching, 0) // 存放标签规则
  18. globalRegs = make([]TagMatching, 0) //关键词规则,是标签规则大前提
  19. //更新es
  20. updateEsPool = make(chan []map[string]interface{}, 5000)
  21. updateEsSp = make(chan bool, 5) //保存协程
  22. // 更新mongo
  23. updatePool = make(chan []map[string]interface{}, 5000)
  24. updateSp = make(chan bool, 5)
  25. )
  26. func main() {
  27. testOss()
  28. return
  29. go updateEsMethod() // 更新es
  30. go updateMethod()
  31. Init()
  32. //oss.InitOss()
  33. InitRule()
  34. taskRun()
  35. log.Println("over")
  36. c := make(chan bool, 1)
  37. <-c
  38. }
  39. func testOss() {
  40. oss.InitOss()
  41. detail := oss.OssGetObject("5a862f0640d2d9bbe88e3cec", "jy-datadetail")
  42. log.Println("detail:", detail)
  43. }
  44. // taskRun 执行
  45. func taskRun() {
  46. defer util.Catch()
  47. sess := Mgo.GetMgoConn()
  48. defer Mgo.DestoryMongoConn(sess)
  49. ch := make(chan bool, 15)
  50. wg := &sync.WaitGroup{}
  51. //查询条件
  52. q := map[string]interface{}{
  53. "_id": map[string]interface{}{
  54. "$gt": mongodb.StringTOBsonId("65b7ebfc66cf0db42a996a65"),
  55. "$lte": mongodb.StringTOBsonId("6612323266cf0db42a75789f"),
  56. //"$lte": mongodb.StringTOBsonId("5f00e67e52c1d9fbf8367996"), //测试环境
  57. },
  58. }
  59. count := 0
  60. selected := map[string]interface{}{"title": 1, "detail": 1, "projectname": 1, "purchasing": 1, "buyer": 1, "attach_text": 1}
  61. it := sess.DB("qfw").C("bidding").Find(q).Select(selected).Iter()
  62. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  63. if count%5000 == 0 {
  64. log.Println("current:", count, tmp["_id"])
  65. }
  66. ch <- true
  67. wg.Add(1)
  68. go func(tmp map[string]interface{}) {
  69. defer func() {
  70. <-ch
  71. wg.Done()
  72. }()
  73. // 存在附件时
  74. if atta, ok := tmp["attach_text"]; ok && atta != nil {
  75. id := mongodb.BsonIdToSId(tmp["_id"])
  76. if id == "" {
  77. return
  78. }
  79. err, doc := Es.GetById("bidding", id)
  80. if err != nil {
  81. return
  82. } else {
  83. tmp["filetext"] = doc["filetext"]
  84. }
  85. }
  86. gs, _, _ := TaskTags(tmp, globalRegs)
  87. if len(gs) > 0 {
  88. tags, match, add := TaskTags(tmp, MatchArr)
  89. if len(tags) > 0 {
  90. update := map[string]interface{}{
  91. "mobile_tag": tags,
  92. }
  93. log.Println("id--", mongodb.BsonIdToSId(tmp["_id"]), match, add)
  94. //更新MongoDB
  95. updatePool <- []map[string]interface{}{
  96. {"_id": tmp["_id"]},
  97. {"$set": update},
  98. }
  99. //Mgo.UpdateById("bidding", mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
  100. //更新es
  101. updateEsPool <- []map[string]interface{}{
  102. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  103. update,
  104. }
  105. }
  106. }
  107. }(tmp)
  108. tmp = make(map[string]interface{})
  109. }
  110. wg.Wait()
  111. fmt.Println("over ---> ", count)
  112. }
  113. // updateMethod 更新MongoDB
  114. func updateMethod() {
  115. arru := make([][]map[string]interface{}, 50)
  116. indexu := 0
  117. for {
  118. select {
  119. case v := <-updatePool:
  120. arru[indexu] = v
  121. indexu++
  122. if indexu == 50 {
  123. updateSp <- true
  124. go func(arru [][]map[string]interface{}) {
  125. defer func() {
  126. <-updateSp
  127. }()
  128. Mgo.UpdateBulk("bidding", arru...)
  129. }(arru)
  130. arru = make([][]map[string]interface{}, 50)
  131. indexu = 0
  132. }
  133. case <-time.After(1000 * time.Millisecond):
  134. if indexu > 0 {
  135. updateSp <- true
  136. go func(arru [][]map[string]interface{}) {
  137. defer func() {
  138. <-updateSp
  139. }()
  140. Mgo.UpdateBulk("bidding", arru...)
  141. }(arru[:indexu])
  142. arru = make([][]map[string]interface{}, 50)
  143. indexu = 0
  144. }
  145. }
  146. }
  147. }
  148. // updateEsMethod 更新es
  149. func updateEsMethod() {
  150. arru := make([][]map[string]interface{}, 200)
  151. indexu := 0
  152. for {
  153. select {
  154. case v := <-updateEsPool:
  155. arru[indexu] = v
  156. indexu++
  157. if indexu == 200 {
  158. updateEsSp <- true
  159. go func(arru [][]map[string]interface{}) {
  160. defer func() {
  161. <-updateEsSp
  162. }()
  163. Es.UpdateBulk("bidding", arru...)
  164. EsNew.UpdateBulk("bidding", arru...)
  165. }(arru)
  166. arru = make([][]map[string]interface{}, 200)
  167. indexu = 0
  168. }
  169. case <-time.After(1000 * time.Millisecond):
  170. if indexu > 0 {
  171. updateEsSp <- true
  172. go func(arru [][]map[string]interface{}) {
  173. defer func() {
  174. <-updateEsSp
  175. }()
  176. Es.UpdateBulk("bidding", arru...)
  177. EsNew.UpdateBulk("bidding", arru...)
  178. }(arru[:indexu])
  179. arru = make([][]map[string]interface{}, 200)
  180. indexu = 0
  181. }
  182. }
  183. }
  184. }