main.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson/primitive"
  4. "log"
  5. "qfw/util"
  6. "regexp"
  7. )
  8. var (
  9. Sysconfig map[string]interface{}
  10. MongoTool *MongodbSim
  11. Dbname string
  12. MgoColl1, MgoColl2 string
  13. ChangeMap []map[string]interface{}
  14. queryClose chan bool
  15. )
  16. func init() {
  17. util.ReadConfig(&Sysconfig)
  18. Dbname = Sysconfig["mongodbName"].(string)
  19. MongoTool = &MongodbSim{
  20. MongodbAddr: Sysconfig["mongodbServers"].(string),
  21. Size: util.IntAll(Sysconfig["mongodbPoolSize"]),
  22. DbName: Dbname,
  23. }
  24. MongoTool.InitPool()
  25. MgoColl1 = Sysconfig["mongoColl1"].(string)
  26. MgoColl2 = Sysconfig["mongoColl2"].(string)
  27. queryClose = make(chan bool)
  28. ChangeMap = util.ObjArrToMapArr(Sysconfig["changeType"].([]interface{}))
  29. initChangeMap()
  30. }
  31. func main() {
  32. count, taskcount := 0, 0
  33. lastid := ""
  34. sess := MongoTool.GetMgoConn()
  35. defer MongoTool.DestoryMongoConn(sess)
  36. pool := make(chan bool, 1)
  37. infoPool := make(chan map[string]interface{}, 2000)
  38. over := make(chan bool)
  39. go func() {
  40. M:
  41. for {
  42. select {
  43. case tmp := <-infoPool:
  44. if taskcount > 1234 {
  45. break
  46. }
  47. pool <- true
  48. go func(tmp map[string]interface{}) {
  49. defer func() {
  50. <-pool
  51. }()
  52. if tmp["changes"] != nil {
  53. infoList := []interface{}(tmp["changes"].(primitive.A))
  54. for _, item := range infoList {
  55. item1 := item.(map[string]interface{})
  56. setMark(item1)
  57. }
  58. //tmp["_id"] = util.StringTOBsonId(util.ObjToString(tmp["company_id"]))
  59. MongoTool.Save(MgoColl2, tmp)
  60. taskcount ++
  61. }
  62. }(tmp)
  63. case <-over:
  64. break M
  65. }
  66. }
  67. }()
  68. fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1}
  69. query := sess.DB(Dbname).C(MgoColl1).Find(nil).Select(fields).Iter()
  70. L:
  71. for {
  72. select {
  73. case <-queryClose:
  74. log.Println("receive interrupt sign")
  75. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  76. break L
  77. default:
  78. tmp := make(map[string]interface{})
  79. if query.Next(&tmp) {
  80. lastid = tmp["company_id"].(string)
  81. if count%10000 == 0 {
  82. util.Debug("current", count, lastid)
  83. }
  84. if tmp["changes"] != nil && len([]interface{}(tmp["changes"].(primitive.A))) > 0 {
  85. infoPool <- tmp
  86. count++
  87. }
  88. } else {
  89. break L
  90. }
  91. }
  92. }
  93. }
  94. func initChangeMap() {
  95. for _, v := range ChangeMap{
  96. list := v["change_keyword"].([]interface {})
  97. var regList []string
  98. if len(list) > 0 {
  99. for _, v1 := range list{
  100. reg := ".*" + util.ObjToString(v1) + ".*"
  101. regList = append(regList, reg)
  102. }
  103. v["change_key_reg"] = regList
  104. }else {
  105. v["change_key_reg"] = []string{".*"}
  106. }
  107. }
  108. }
  109. func setMark(tmp map[string]interface{}) {
  110. for _, v := range ChangeMap{
  111. str := util.ObjToString(tmp["change_field"])
  112. regArr := v["change_key_reg"].([]string)
  113. for _, v1 := range regArr{
  114. matched, _ := regexp.MatchString(v1, str)
  115. if matched {
  116. tmp["change_name_new"] = v["change_name"]
  117. return
  118. }
  119. }
  120. }
  121. }