main.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/spf13/viper"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "math/rand"
  12. "time"
  13. )
  14. var (
  15. MongoStd, MongoBase *mongodb.MongodbSim
  16. GF GlobalConf
  17. // 更新mongo
  18. updatePool = make(chan []map[string]interface{}, 5000)
  19. seed = 188
  20. findThread = 5
  21. updateThread = 4
  22. seoidCh = make(chan int64, 1000)
  23. )
  24. func InitConfig() (err error) {
  25. viper.SetConfigFile("config.toml") // 指定配置文件路径
  26. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  27. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  28. viper.AddConfigPath("./")
  29. err = viper.ReadInConfig() // 查找并读取配置文件
  30. if err != nil { // 处理读取配置文件的错误
  31. return
  32. }
  33. err = viper.Unmarshal(&GF)
  34. return err
  35. }
  36. func InitMgo() {
  37. MongoStd = &mongodb.MongodbSim{
  38. MongodbAddr: GF.MongoStd.Host,
  39. Size: GF.MongoStd.Size,
  40. DbName: GF.MongoStd.DB,
  41. UserName: GF.MongoStd.Username,
  42. Password: GF.MongoStd.Password,
  43. Direct: GF.MongoStd.Direct,
  44. }
  45. MongoStd.InitPool()
  46. MongoBase = &mongodb.MongodbSim{
  47. MongodbAddr: GF.MongoBase.Host,
  48. Size: GF.MongoBase.Size,
  49. DbName: GF.MongoBase.DB,
  50. UserName: GF.MongoBase.Username,
  51. Password: GF.MongoBase.Password,
  52. Direct: GF.MongoBase.Direct,
  53. }
  54. MongoBase.InitPool()
  55. }
  56. func main() {
  57. InitConfig()
  58. InitMgo()
  59. updateStd()
  60. }
  61. func main2() {
  62. InitConfig()
  63. InitMgo()
  64. id := int64(GF.Env.Autoid) //起始 autoid
  65. //生成seoid
  66. //31395235740
  67. startSeoId := int64(GF.Env.Seoid) //起始 nseo_id
  68. rand.Seed(time.Now().UnixNano())
  69. go func() {
  70. for {
  71. startSeoId += int64(rand.Intn(seed) + 2)
  72. seoidCh <- startSeoId
  73. }
  74. }()
  75. go updateMethod()
  76. loop := 0
  77. count := int64(0)
  78. for loop < 10 {
  79. //数据处理线程
  80. dealTh := make(chan bool, 20)
  81. defer func() {
  82. for i := 0; i < 20; i++ {
  83. dealTh <- true
  84. }
  85. }()
  86. sess := MongoBase.GetMgoConn()
  87. defer MongoBase.DestoryMongoConn(sess)
  88. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  89. coll := sess.M.C.Database("mixdata").Collection("company_base")
  90. find := options.Find().SetBatchSize(200).SetSort(bson.D{bson.E{"_id", 1}}).SetProjection(bson.M{"_id": 1, "company_id": 1, "establish_date": 1, "create_time": 1})
  91. cur, err := coll.Find(ctx, bson.M{"_id": bson.M{"$gt": id}}, find)
  92. if err != nil {
  93. log.Println("mgo find err", err.Error())
  94. return
  95. }
  96. for tmp := make(map[string]interface{}); cur.Next(ctx); {
  97. count++
  98. if cur != nil {
  99. cur.Decode(&tmp)
  100. id = util.Int64All(tmp["_id"])
  101. dealTh <- true
  102. go func(tmp map[string]interface{}) {
  103. defer func() {
  104. <-dealTh
  105. }()
  106. Data(tmp)
  107. }(tmp)
  108. tmp = make(map[string]interface{})
  109. } else {
  110. cur.Close(ctx)
  111. break
  112. }
  113. if count%10000 == 0 {
  114. go log.Println("current,id", count, id)
  115. }
  116. }
  117. loop++
  118. }
  119. log.Println("over ---- ")
  120. time.Sleep(100000 * time.Hour)
  121. }
  122. func Data(tmp map[string]interface{}) {
  123. seoid := <-seoidCh
  124. ed, _ := tmp["establish_date"].(string)
  125. pre := ""
  126. if len(ed) == 10 {
  127. pre = ed[2:4] + ed[5:7] + ed[8:10]
  128. } else {
  129. cd, _ := tmp["create_time"].(string)
  130. if len(cd) > 9 {
  131. pre = cd[2:4] + cd[5:7] + cd[8:10]
  132. }
  133. }
  134. updatePool <- []map[string]interface{}{
  135. {"_id": tmp["company_id"]},
  136. {"$set": bson.M{
  137. "nseo_id": fmt.Sprintf("%s%d", pre, seoid),
  138. "autoid": tmp["_id"],
  139. }},
  140. }
  141. }
  142. // updateMethod 更新MongoDB
  143. func updateMethod() {
  144. updateSp := make(chan bool, 8)
  145. arru := make([][]map[string]interface{}, 200)
  146. indexu := 0
  147. for {
  148. select {
  149. case v := <-updatePool:
  150. arru[indexu] = v
  151. indexu++
  152. if indexu == 200 {
  153. updateSp <- true
  154. go func(arru [][]map[string]interface{}) {
  155. defer func() {
  156. <-updateSp
  157. }()
  158. MongoStd.UpdateBulk("qyxy_std", arru...)
  159. }(arru)
  160. arru = make([][]map[string]interface{}, 200)
  161. indexu = 0
  162. }
  163. case <-time.After(1000 * time.Millisecond):
  164. if indexu > 0 {
  165. updateSp <- true
  166. go func(arru [][]map[string]interface{}) {
  167. defer func() {
  168. <-updateSp
  169. }()
  170. MongoStd.UpdateBulk("qyxy_std", arru...)
  171. }(arru[:indexu])
  172. arru = make([][]map[string]interface{}, 200)
  173. indexu = 0
  174. }
  175. }
  176. }
  177. }
  178. //updateStd 根据qyxy_std 表,更新 nseo_id
  179. func updateStd() {
  180. id := int64(GF.Env.Autoid) //起始 autoid
  181. //生成seoid
  182. //31395235740
  183. startSeoId := int64(GF.Env.Seoid) //起始 nseo_id
  184. rand.Seed(time.Now().UnixNano())
  185. count := int64(0)
  186. //var wg sync.WaitGroup
  187. sess := MongoStd.GetMgoConn()
  188. defer MongoStd.DestoryMongoConn(sess)
  189. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  190. coll := sess.M.C.Database("mixdata").Collection("qyxy_std")
  191. find := options.Find().SetBatchSize(200).SetSort(bson.D{bson.E{"autoid", 1}}).SetProjection(bson.M{"_id": 1, "autoid": 1, "establish_date": 1, "create_time_msql": 1})
  192. cur, err := coll.Find(ctx, bson.M{"autoid": bson.M{"$gt": id}}, find)
  193. if err != nil {
  194. log.Println("mgo find err", err.Error())
  195. return
  196. }
  197. for tmp := make(map[string]interface{}); cur.Next(ctx); {
  198. count++
  199. if cur != nil {
  200. cur.Decode(&tmp)
  201. id = util.Int64All(tmp["autoid"])
  202. ed, _ := tmp["establish_date"].(string)
  203. pre := ""
  204. if len(ed) == 10 {
  205. pre = ed[2:4] + ed[5:7] + ed[8:10]
  206. } else {
  207. cd, _ := tmp["create_time_msql"].(string)
  208. if len(cd) > 9 {
  209. pre = cd[2:4] + cd[5:7] + cd[8:10]
  210. }
  211. }
  212. startSeoId += int64(rand.Intn(seed) + 2)
  213. nseo_id := fmt.Sprintf("%s%d", pre, startSeoId)
  214. update := make(map[string]interface{})
  215. update["$set"] = bson.M{
  216. "nseo_id": nseo_id,
  217. "autoid": id,
  218. }
  219. where := map[string]interface{}{
  220. "_id": tmp["_id"],
  221. }
  222. go MongoStd.Update("qyxy_std", where, update, true, false)
  223. if count%10000 == 0 {
  224. log.Println("current,id", count, id, startSeoId, nseo_id)
  225. }
  226. tmp = make(map[string]interface{})
  227. } else {
  228. cur.Close(ctx)
  229. break
  230. }
  231. }
  232. log.Println("over ----;autoid: ", id, "seo_id:", startSeoId)
  233. time.Sleep(20 * time.Minute)
  234. }