main.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/spf13/viper"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "math/rand"
  12. "time"
  13. )
  14. var (
  15. MongoStd, MongoBase *mongodb.MongodbSim
  16. GF GlobalConf
  17. // 更新mongo
  18. updatePool = make(chan []map[string]interface{}, 5000)
  19. seed = 188
  20. findThread = 5
  21. updateThread = 4
  22. seoidCh = make(chan int64, 1000)
  23. )
  24. func InitConfig() (err error) {
  25. viper.SetConfigFile("config.toml") // 指定配置文件路径
  26. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  27. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  28. viper.AddConfigPath("./")
  29. err = viper.ReadInConfig() // 查找并读取配置文件
  30. if err != nil { // 处理读取配置文件的错误
  31. return
  32. }
  33. err = viper.Unmarshal(&GF)
  34. return err
  35. }
  36. func InitMgo() {
  37. MongoStd = &mongodb.MongodbSim{
  38. MongodbAddr: GF.MongoStd.Host,
  39. Size: GF.MongoStd.Size,
  40. DbName: GF.MongoStd.DB,
  41. UserName: GF.MongoStd.Username,
  42. Password: GF.MongoStd.Password,
  43. Direct: GF.MongoStd.Direct,
  44. }
  45. MongoStd.InitPool()
  46. MongoBase = &mongodb.MongodbSim{
  47. MongodbAddr: GF.MongoBase.Host,
  48. Size: GF.MongoBase.Size,
  49. DbName: GF.MongoBase.DB,
  50. UserName: GF.MongoBase.Username,
  51. Password: GF.MongoBase.Password,
  52. Direct: GF.MongoBase.Direct,
  53. }
  54. MongoBase.InitPool()
  55. }
  56. func main() {
  57. InitConfig()
  58. InitMgo()
  59. id := int64(GF.Env.Autoid) //起始 autoid
  60. //生成seoid
  61. //31395235740
  62. startSeoId := int64(GF.Env.Nseoid) //起始 nseo_id
  63. rand.Seed(time.Now().UnixNano())
  64. go func() {
  65. for {
  66. startSeoId += int64(rand.Intn(seed) + 2)
  67. seoidCh <- startSeoId
  68. }
  69. }()
  70. go updateMethod()
  71. loop := 0
  72. count := int64(0)
  73. for loop < 10 {
  74. //数据处理线程
  75. dealTh := make(chan bool, 20)
  76. defer func() {
  77. for i := 0; i < 20; i++ {
  78. dealTh <- true
  79. }
  80. }()
  81. sess := MongoBase.GetMgoConn()
  82. defer MongoBase.DestoryMongoConn(sess)
  83. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  84. coll := sess.M.C.Database("mixdata").Collection("company_base")
  85. find := options.Find().SetBatchSize(200).SetSort(bson.D{bson.E{"_id", 1}}).SetProjection(bson.M{"_id": 1, "company_id": 1, "establish_date": 1, "create_time": 1})
  86. cur, err := coll.Find(ctx, bson.M{"_id": bson.M{"$gt": id}}, find)
  87. if err != nil {
  88. log.Println("mgo find err", err.Error())
  89. return
  90. }
  91. for tmp := make(map[string]interface{}); cur.Next(ctx); {
  92. count++
  93. if cur != nil {
  94. cur.Decode(&tmp)
  95. id = util.Int64All(tmp["_id"])
  96. dealTh <- true
  97. go func(tmp map[string]interface{}) {
  98. defer func() {
  99. <-dealTh
  100. }()
  101. Data(tmp)
  102. }(tmp)
  103. tmp = make(map[string]interface{})
  104. } else {
  105. cur.Close(ctx)
  106. break
  107. }
  108. if count%10000 == 0 {
  109. go log.Println("current,id", count, id)
  110. }
  111. }
  112. loop++
  113. }
  114. log.Println("over ---- ")
  115. time.Sleep(100000 * time.Hour)
  116. }
  117. func Data(tmp map[string]interface{}) {
  118. seoid := <-seoidCh
  119. ed, _ := tmp["establish_date"].(string)
  120. pre := ""
  121. if len(ed) == 10 {
  122. pre = ed[2:4] + ed[5:7] + ed[8:10]
  123. } else {
  124. cd, _ := tmp["create_time"].(string)
  125. if len(cd) > 9 {
  126. pre = cd[2:4] + cd[5:7] + cd[8:10]
  127. }
  128. }
  129. updatePool <- []map[string]interface{}{
  130. {"_id": tmp["company_id"]},
  131. {"$set": bson.M{
  132. "nseo_id": fmt.Sprintf("%s%d", pre, seoid),
  133. "autoid": tmp["_id"],
  134. }},
  135. }
  136. }
  137. // updateMethod 更新MongoDB
  138. func updateMethod() {
  139. updateSp := make(chan bool, 8)
  140. arru := make([][]map[string]interface{}, 200)
  141. indexu := 0
  142. for {
  143. select {
  144. case v := <-updatePool:
  145. arru[indexu] = v
  146. indexu++
  147. if indexu == 200 {
  148. updateSp <- true
  149. go func(arru [][]map[string]interface{}) {
  150. defer func() {
  151. <-updateSp
  152. }()
  153. MongoStd.UpdateBulk("qyxy_std", arru...)
  154. }(arru)
  155. arru = make([][]map[string]interface{}, 200)
  156. indexu = 0
  157. }
  158. case <-time.After(1000 * time.Millisecond):
  159. if indexu > 0 {
  160. updateSp <- true
  161. go func(arru [][]map[string]interface{}) {
  162. defer func() {
  163. <-updateSp
  164. }()
  165. MongoStd.UpdateBulk("qyxy_std", arru...)
  166. }(arru[:indexu])
  167. arru = make([][]map[string]interface{}, 200)
  168. indexu = 0
  169. }
  170. }
  171. }
  172. }