main.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "fmt"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "time"
  9. )
  10. var (
  11. //更新es
  12. updateEsPool = make(chan []map[string]interface{}, 5000)
  13. updateEsSp = make(chan bool, 3) //保存协程
  14. Es *es.Elastic
  15. esIndex = "qyxy"
  16. Mgo *mongodb.MongodbSim
  17. MgoQY *mongodb.MongodbSim
  18. )
  19. func initEs() {
  20. Es = &es.Elastic{
  21. //S_esurl: "http://127.0.0.1:19908",
  22. S_esurl: "http://172.17.4.184:19908",
  23. I_size: 10,
  24. Username: "jybid",
  25. Password: "Top2023_JEB01i@31",
  26. }
  27. Es.InitElasticSize()
  28. }
  29. // initMgo initMgo
  30. func initMgo() {
  31. //181 凭安库
  32. MgoQY = &mongodb.MongodbSim{
  33. MongodbAddr: "172.17.4.181:27001",
  34. //MongodbAddr: "127.0.0.1:27001",
  35. DbName: "mixdata",
  36. Size: 10,
  37. UserName: "",
  38. Password: "",
  39. //Direct: true,
  40. }
  41. MgoQY.InitPool()
  42. //qyxy_std
  43. Mgo = &mongodb.MongodbSim{
  44. MongodbAddr: "172.17.189.140:27080",
  45. //MongodbAddr: "127.0.0.1:27083",
  46. Size: 10,
  47. DbName: "mixdata",
  48. UserName: "SJZY_RWbid_ES",
  49. Password: "SJZY@B4i4D5e6S",
  50. //Direct: true,
  51. }
  52. Mgo.InitPool()
  53. }
  54. func main() {
  55. go updateEsMethod()
  56. initEs()
  57. initMgo()
  58. dealCapitalData()
  59. select {}
  60. }
  61. // dealCapitalData 处理数据 注册资金
  62. func dealCapitalData() {
  63. defer util.Catch()
  64. sess := MgoQY.GetMgoConn()
  65. defer MgoQY.DestoryMongoConn(sess)
  66. it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Sort("_id").Iter()
  67. fmt.Println("taskRun 开始")
  68. count := 0
  69. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  70. if count%1000 == 0 {
  71. log.Println("current:", count, tmp["company_name"])
  72. }
  73. where := map[string]interface{}{
  74. "company_name": tmp["company_name"],
  75. }
  76. update := make(map[string]interface{})
  77. std, _ := Mgo.FindOne("qyxy_std", where)
  78. companyType := util.ObjToString(tmp["company_type"])
  79. if companyType == "事业单位" {
  80. special, _ := MgoQY.FindOne("special_enterprise", where)
  81. //1.事业单位数据,注册资金错误
  82. if util.ObjToString((*special)["capital"]) != "" {
  83. text := util.ObjToString((*special)["capital"])
  84. capital := ObjToMoney(text)
  85. capital = capital / 10000
  86. update["capital"] = capital
  87. }
  88. } else {
  89. //2.企业的capital =nil,需要更新为0
  90. if util.ObjToString(tmp["capital"]) == "" {
  91. update["capital"] = float64(0)
  92. }
  93. }
  94. if _, ok := (*std)["capital"]; ok {
  95. if len(update) > 0 {
  96. //Mgo.Update(GF.Mongo.Coll, where, map[string]interface{}{"$set": update}, true, false)
  97. Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
  98. ////更新es
  99. //err := Es.UpdateDocument(esIndex, util.ObjToString(tmp["company_id"]), update)
  100. //if err != nil {
  101. // log.Println("err", err, update, where)
  102. //}
  103. updateEsPool <- []map[string]interface{}{
  104. {"_id": util.ObjToString(tmp["company_id"])},
  105. update,
  106. }
  107. }
  108. }
  109. }
  110. log.Println("数据迭代结束")
  111. }
  112. // updateEsMethod 更新es
  113. func updateEsMethod() {
  114. arru := make([][]map[string]interface{}, 200) //200条一组更新es
  115. indexu := 0
  116. for {
  117. select {
  118. case v := <-updateEsPool:
  119. arru[indexu] = v
  120. indexu++
  121. if indexu == 200 {
  122. updateEsSp <- true
  123. go func(arru [][]map[string]interface{}) {
  124. defer func() {
  125. <-updateEsSp
  126. }()
  127. Es.UpdateBulk(esIndex, arru...)
  128. }(arru)
  129. arru = make([][]map[string]interface{}, 200)
  130. indexu = 0
  131. }
  132. case <-time.After(1000 * time.Millisecond):
  133. if indexu > 0 {
  134. updateEsSp <- true
  135. go func(arru [][]map[string]interface{}) {
  136. defer func() {
  137. <-updateEsSp
  138. }()
  139. Es.UpdateBulk(esIndex, arru...)
  140. }(arru[:indexu])
  141. arru = make([][]map[string]interface{}, 200)
  142. indexu = 0
  143. }
  144. }
  145. }
  146. }