main.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/wcc4869/common_utils/log"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. Mgo *mongodb.MongodbSim
  13. saveSize = 50
  14. Es *elastic.Elastic
  15. EsNew *elastic.Elastic
  16. // 更新mongo
  17. updatePool = make(chan []map[string]interface{}, 5000)
  18. updateSp = make(chan bool, 5)
  19. //更新es
  20. updateEsPool = make(chan []map[string]interface{}, 5000)
  21. updateEsSp = make(chan bool, 2) //保存协程
  22. )
  23. func main() {
  24. //mongodb
  25. Mgo = &mongodb.MongodbSim{
  26. MongodbAddr: "172.17.189.140:27080",
  27. //MongodbAddr: "127.0.0.1:27083",
  28. DbName: "qfw",
  29. Size: 10,
  30. UserName: "SJZY_RWbid_ES",
  31. Password: "SJZY@B4i4D5e6S",
  32. //Direct: true,
  33. }
  34. Mgo.InitPool()
  35. //es
  36. Es = &elastic.Elastic{
  37. //S_esurl: "http://127.0.0.1:19805",
  38. S_esurl: "http://172.17.4.184:19805",
  39. I_size: 5,
  40. Username: "es_all",
  41. Password: "TopJkO2E_d1x",
  42. }
  43. Es.InitElasticSize()
  44. //es 新集群
  45. EsNew = &elastic.Elastic{
  46. //S_esurl: "http://127.0.0.1:19905",
  47. S_esurl: "http://172.17.4.184:19905",
  48. I_size: 5,
  49. Username: "jybid",
  50. Password: "Top2023_JEB01i@31",
  51. }
  52. EsNew.InitElasticSize()
  53. go updateMethod() //更新mongodb
  54. go updateEsMethod() //更新es
  55. taskRun()
  56. fmt.Println(111)
  57. c := make(chan bool, 1)
  58. <-c
  59. }
  60. // taskRun 更新es 省市区三个字段
  61. func taskRun() {
  62. defer util.Catch()
  63. sess := Mgo.GetMgoConn()
  64. defer Mgo.DestoryMongoConn(sess)
  65. pool := make(chan bool, 10) //处理协程
  66. wg := &sync.WaitGroup{}
  67. //查询条件
  68. q := map[string]interface{}{
  69. //"_id": map[string]interface{}{
  70. // "$gt": mongodb.StringTOBsonId("652423800000000000000000"),
  71. // "$lte": mongodb.StringTOBsonId("6543c7800000000000000000"),
  72. //},
  73. "comeintime": map[string]interface{}{
  74. "$gt": 1669824000,
  75. //"$lte": 1669864950,
  76. "$lte": 1702265941,
  77. },
  78. "site": "国家能源e购",
  79. }
  80. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  81. it := sess.DB("qfw").C("bidding").Find(&q).Select(&selected).Iter()
  82. fmt.Println("开始")
  83. count := 0
  84. realNum := 0
  85. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  86. if count%10000 == 0 {
  87. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  88. }
  89. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  90. tmp = make(map[string]interface{})
  91. continue
  92. }
  93. // 针对存量数据,重复数据不进索引
  94. if util.IntAll(tmp["extracttype"]) == -1 {
  95. continue
  96. }
  97. //针对产权数据,暂时不入es 索引库
  98. if util.IntAll(tmp["infoformat"]) == 3 {
  99. continue
  100. }
  101. //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
  102. channel := util.ObjToString(tmp["channel"])
  103. if channel != "紧急直接零星采购公告" {
  104. continue
  105. }
  106. realNum++
  107. fmt.Println(mongodb.BsonIdToSId(tmp["_id"]))
  108. pool <- true
  109. wg.Add(1)
  110. go func(tmp map[string]interface{}) {
  111. defer func() {
  112. <-pool
  113. wg.Done()
  114. }()
  115. //1.更新MongoDB
  116. update := map[string]interface{}{
  117. "toptype": "结果",
  118. "subtype": "成交",
  119. }
  120. if len(update) > 0 {
  121. //更新MongoDB
  122. updatePool <- []map[string]interface{}{
  123. {"_id": tmp["_id"]},
  124. {"$set": update},
  125. }
  126. }
  127. //2.es 更新字段
  128. esUpdate := map[string]interface{}{
  129. "toptype": "结果",
  130. "subtype": "成交",
  131. "id": mongodb.BsonIdToSId(tmp["_id"]),
  132. }
  133. if len(esUpdate) > 0 {
  134. // 更新es
  135. updateEsPool <- []map[string]interface{}{
  136. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  137. esUpdate,
  138. }
  139. }
  140. }(tmp)
  141. tmp = make(map[string]interface{})
  142. }
  143. wg.Wait()
  144. log.Info("Run Over...Count1:", log.Int("count", count), log.Int("realNum", realNum))
  145. fmt.Println("结束")
  146. }
  147. // updateMethod 更新MongoDB
  148. func updateMethod() {
  149. arru := make([][]map[string]interface{}, saveSize)
  150. indexu := 0
  151. for {
  152. select {
  153. case v := <-updatePool:
  154. arru[indexu] = v
  155. indexu++
  156. if indexu == saveSize {
  157. updateSp <- true
  158. go func(arru [][]map[string]interface{}) {
  159. defer func() {
  160. <-updateSp
  161. }()
  162. Mgo.UpdateBulk("bidding", arru...)
  163. }(arru)
  164. arru = make([][]map[string]interface{}, saveSize)
  165. indexu = 0
  166. }
  167. case <-time.After(1000 * time.Millisecond):
  168. if indexu > 0 {
  169. updateSp <- true
  170. go func(arru [][]map[string]interface{}) {
  171. defer func() {
  172. <-updateSp
  173. }()
  174. Mgo.UpdateBulk("bidding", arru...)
  175. }(arru[:indexu])
  176. arru = make([][]map[string]interface{}, saveSize)
  177. indexu = 0
  178. }
  179. }
  180. }
  181. }
  182. // updateEsMethod 更新es
  183. func updateEsMethod() {
  184. arru := make([][]map[string]interface{}, 200)
  185. indexu := 0
  186. for {
  187. select {
  188. case v := <-updateEsPool:
  189. arru[indexu] = v
  190. indexu++
  191. if indexu == 200 {
  192. updateEsSp <- true
  193. go func(arru [][]map[string]interface{}) {
  194. defer func() {
  195. <-updateEsSp
  196. }()
  197. Es.UpdateBulk("bidding", arru...)
  198. EsNew.UpdateBulk("bidding", arru...)
  199. }(arru)
  200. arru = make([][]map[string]interface{}, 200)
  201. indexu = 0
  202. }
  203. case <-time.After(1000 * time.Millisecond):
  204. if indexu > 0 {
  205. updateEsSp <- true
  206. go func(arru [][]map[string]interface{}) {
  207. defer func() {
  208. <-updateEsSp
  209. }()
  210. Es.UpdateBulk("bidding", arru...)
  211. EsNew.UpdateBulk("bidding", arru...)
  212. }(arru[:indexu])
  213. arru = make([][]map[string]interface{}, 200)
  214. indexu = 0
  215. }
  216. }
  217. }
  218. }