main.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package main
  2. import (
  3. "fmt"
  4. "go.mongodb.org/mongo-driver/bson"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "log"
  9. "strings"
  10. "time"
  11. )
  12. var (
  13. //更新es
  14. updateEsPool = make(chan []map[string]interface{}, 5000)
  15. updateEsSp = make(chan bool, 3) //保存协程
  16. Es *es.Elastic
  17. esIndex = "qyxy"
  18. Mgo *mongodb.MongodbSim
  19. MgoQY *mongodb.MongodbSim
  20. )
  21. func initEs() {
  22. Es = &es.Elastic{
  23. //S_esurl: "http://127.0.0.1:19908",
  24. S_esurl: "http://172.17.4.184:19908",
  25. I_size: 10,
  26. Username: "jybid",
  27. Password: "Top2023_JEB01i@31",
  28. }
  29. Es.InitElasticSize()
  30. }
  31. // initMgo initMgo
  32. func initMgo() {
  33. //181 凭安库
  34. //MgoQY = &mongodb.MongodbSim{
  35. // MongodbAddr: "172.17.4.181:27001",
  36. // //MongodbAddr: "127.0.0.1:27001",
  37. // DbName: "mixdata",
  38. // Size: 10,
  39. // UserName: "",
  40. // Password: "",
  41. // //Direct: true,
  42. //}
  43. //MgoQY.InitPool()
  44. //qyxy_std
  45. Mgo = &mongodb.MongodbSim{
  46. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  47. //MongodbAddr: "127.0.0.1:27083",
  48. Size: 10,
  49. DbName: "mixdata",
  50. UserName: "SJZY_RWbid_ES",
  51. Password: "SJZY@B4i4D5e6S",
  52. //Direct: true,
  53. }
  54. Mgo.InitPool()
  55. }
  56. func main() {
  57. go updateEsMethod()
  58. initEs()
  59. initMgo()
  60. //dealCapitalData()
  61. dealCompanyTypeInt()
  62. select {}
  63. }
  64. // dealCapitalData 处理数据 注册资金
  65. func dealCapitalData() {
  66. defer util.Catch()
  67. sess := MgoQY.GetMgoConn()
  68. defer MgoQY.DestoryMongoConn(sess)
  69. it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Sort("_id").Iter()
  70. fmt.Println("taskRun 开始")
  71. count := 0
  72. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  73. if count%1000 == 0 {
  74. log.Println("current:", count, tmp["company_name"])
  75. }
  76. where := map[string]interface{}{
  77. "company_name": tmp["company_name"],
  78. }
  79. update := make(map[string]interface{})
  80. std, _ := Mgo.FindOne("qyxy_std", where)
  81. companyType := util.ObjToString(tmp["company_type"])
  82. if companyType == "事业单位" {
  83. special, _ := MgoQY.FindOne("special_enterprise", where)
  84. //1.事业单位数据,注册资金错误
  85. if util.ObjToString((*special)["capital"]) != "" {
  86. text := util.ObjToString((*special)["capital"])
  87. capital := ObjToMoney(text)
  88. capital = capital / 10000
  89. update["capital"] = capital
  90. }
  91. } else {
  92. //2.企业的capital =nil,需要更新为0
  93. if util.ObjToString(tmp["capital"]) == "" {
  94. update["capital"] = float64(0)
  95. }
  96. }
  97. if _, ok := (*std)["capital"]; ok {
  98. if len(update) > 0 {
  99. //Mgo.Update(GF.Mongo.Coll, where, map[string]interface{}{"$set": update}, true, false)
  100. Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
  101. ////更新es
  102. //err := Es.UpdateDocument(esIndex, util.ObjToString(tmp["company_id"]), update)
  103. //if err != nil {
  104. // log.Println("err", err, update, where)
  105. //}
  106. updateEsPool <- []map[string]interface{}{
  107. {"_id": util.ObjToString(tmp["company_id"])},
  108. update,
  109. }
  110. }
  111. }
  112. }
  113. log.Println("数据迭代结束")
  114. }
  115. // dealCompanyTypeInt 修复company_type_int
  116. func dealCompanyTypeInt() {
  117. defer util.Catch()
  118. sess := Mgo.GetMgoConn()
  119. defer Mgo.DestoryMongoConn(sess)
  120. // 构建查询条件
  121. where := bson.M{
  122. "$or": []bson.M{
  123. {"company_type": "其他"}, // company_type 等于 "其他"
  124. {"company_type": bson.M{"$exists": false}}, // company_type 不存在
  125. },
  126. }
  127. it := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter()
  128. fmt.Println("taskRun 开始")
  129. count := 0
  130. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  131. if count%1000 == 0 {
  132. log.Println("current:", count, tmp["company_name"], tmp["_id"])
  133. }
  134. company_name := util.ObjToString(tmp["company_name"])
  135. if strings.HasSuffix(company_name, "公司") || strings.HasSuffix(company_name, "集团") {
  136. update := make(map[string]interface{})
  137. update["company_type_int"] = 12
  138. updateEsPool <- []map[string]interface{}{
  139. {"_id": util.ObjToString(tmp["_id"])},
  140. update,
  141. }
  142. }
  143. }
  144. log.Println("数据处理完毕,总数是", count)
  145. }
  146. // updateEsMethod 更新es
  147. func updateEsMethod() {
  148. arru := make([][]map[string]interface{}, 200) //200条一组更新es
  149. indexu := 0
  150. for {
  151. select {
  152. case v := <-updateEsPool:
  153. arru[indexu] = v
  154. indexu++
  155. if indexu == 200 {
  156. updateEsSp <- true
  157. go func(arru [][]map[string]interface{}) {
  158. defer func() {
  159. <-updateEsSp
  160. }()
  161. Es.UpdateBulk(esIndex, arru...)
  162. }(arru)
  163. arru = make([][]map[string]interface{}, 200)
  164. indexu = 0
  165. }
  166. case <-time.After(1000 * time.Millisecond):
  167. if indexu > 0 {
  168. updateEsSp <- true
  169. go func(arru [][]map[string]interface{}) {
  170. defer func() {
  171. <-updateEsSp
  172. }()
  173. Es.UpdateBulk(esIndex, arru...)
  174. }(arru[:indexu])
  175. arru = make([][]map[string]interface{}, 200)
  176. indexu = 0
  177. }
  178. }
  179. }
  180. }