move.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. //数据迁移
  2. package mover
  3. import (
  4. "sync"
  5. "time"
  6. util "app.yhyue.com/moapp/jybase/common"
  7. . "app.yhyue.com/moapp/jybase/mongodb"
  8. . "bp.jydev.jianyu360.cn/BaseService/pushpkg/db"
  9. . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
  10. "app.yhyue.com/moapp/jybase/logger"
  11. )
  12. type Mover interface {
  13. MaxPushSize(temp map[string]interface{}) int
  14. MergeList(o, n interface{}, maxPushSize int) interface{}
  15. ToList(list interface{}) interface{}
  16. Unique(temp map[string]interface{}) string
  17. Query(unique string) map[string]interface{}
  18. SortListSplit(list interface{}, f func(v interface{}))
  19. }
  20. type MoveUser struct {
  21. info map[string]interface{}
  22. ids []interface{}
  23. maxPushSize int
  24. }
  25. type Move struct {
  26. Mgo_Log *MongodbSim
  27. Pushspace_temp string
  28. Mgo_Log_DbName string
  29. Pushspace string
  30. MoveBatch int
  31. MergePoolSize int
  32. SortFields []string
  33. TestQuery map[string]interface{}
  34. Mover Mover
  35. }
  36. func (m *Move) Execute() int {
  37. defer util.Catch()
  38. nowUnix := time.Now().Unix()
  39. sess := m.Mgo_Log.GetMgoConn()
  40. defer m.Mgo_Log.DestoryMongoConn(sess)
  41. query := map[string]interface{}{
  42. "timestamp": map[string]interface{}{
  43. "$lt": nowUnix,
  44. },
  45. }
  46. if m.TestQuery != nil {
  47. for k, v := range m.TestQuery {
  48. query[k] = v
  49. }
  50. }
  51. logger.Info("迁移数据query", query)
  52. it := sess.DB(m.Mgo_Log_DbName).C(m.Pushspace_temp).Find(query).Sort(m.SortFields...).Iter()
  53. moveUsers := map[string]*MoveUser{}
  54. index, number, length := 0, 0, 0
  55. for temp := make(map[string]interface{}); it.Next(&temp); {
  56. index++
  57. unique := m.Mover.Unique(temp)
  58. maxPushSize := util.IntAll(temp["maxpushsize"])
  59. if maxPushSize <= 0 {
  60. maxPushSize = m.Mover.MaxPushSize(temp)
  61. }
  62. moveUser := moveUsers[unique]
  63. if moveUser != nil {
  64. temp["list"] = m.Mover.MergeList(moveUser.info["list"], temp["list"], maxPushSize)
  65. moveUser.info = temp
  66. moveUser.maxPushSize = maxPushSize
  67. moveUser.ids = append(moveUser.ids, temp["_id"])
  68. } else {
  69. temp["list"] = m.Mover.ToList(temp["list"])
  70. moveUser = &MoveUser{
  71. info: temp,
  72. ids: []interface{}{temp["_id"]},
  73. maxPushSize: maxPushSize,
  74. }
  75. }
  76. moveUsers[unique] = moveUser
  77. length++
  78. if length == m.MoveBatch {
  79. m.merge(&number, nowUnix, moveUsers)
  80. length = 0
  81. moveUsers = map[string]*MoveUser{}
  82. }
  83. temp = make(map[string]interface{})
  84. if index%500 == 0 {
  85. logger.Info("迁移数据:", index)
  86. }
  87. }
  88. if length > 0 {
  89. m.merge(&number, nowUnix, moveUsers)
  90. length = 0
  91. moveUsers = map[string]*MoveUser{}
  92. }
  93. return index
  94. }
  95. func (m *Move) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUser) {
  96. *number++
  97. logger.Info("第", *number, "次开始合并数据")
  98. index := 0
  99. saveArray := []map[string]interface{}{}
  100. newDelArray, oldDelArray := []interface{}{}, []interface{}{}
  101. //
  102. mergeLock := &sync.Mutex{}
  103. mergeWait := &sync.WaitGroup{}
  104. mergePool := make(chan bool, m.MergePoolSize)
  105. for k, v := range moveUsers {
  106. mergePool <- true
  107. mergeWait.Add(1)
  108. go func(unique string, moveUser *MoveUser) {
  109. defer util.Catch()
  110. defer func() {
  111. <-mergePool
  112. mergeWait.Done()
  113. }()
  114. sess := m.Mgo_Log.GetMgoConn()
  115. defer m.Mgo_Log.DestoryMongoConn(sess)
  116. var datas []map[string]interface{}
  117. sess.DB(m.Mgo_Log_DbName).C(m.Pushspace).Find(m.Mover.Query(unique)).Select(map[string]interface{}{"_id": 1, "list": 1}).All(&datas)
  118. mergeLock.Lock()
  119. defer mergeLock.Unlock()
  120. var newList interface{}
  121. if datas == nil { //批量新增
  122. newList = moveUser.info["list"]
  123. } else {
  124. array := []interface{}{}
  125. for _, v := range datas {
  126. oldList, _ := v["list"].([]interface{})
  127. array = append(array, oldList...)
  128. newDelArray = append(newDelArray, v["_id"])
  129. if len(newDelArray) >= Mgo_BulkSize {
  130. MyMgo.DelBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace, &newDelArray)
  131. }
  132. }
  133. newList = m.Mover.MergeList(m.Mover.ToList(array), moveUser.info["list"], moveUser.maxPushSize)
  134. }
  135. oldDelArray = append(oldDelArray, moveUser.ids...)
  136. if len(oldDelArray) >= Mgo_BulkSize {
  137. MyMgo.DelBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace_temp, &oldDelArray)
  138. }
  139. m.Mover.SortListSplit(newList, func(v interface{}) {
  140. info := map[string]interface{}{
  141. "list": v,
  142. }
  143. for ik, iv := range moveUser.info {
  144. if ik == "_id" || ik == "list" {
  145. continue
  146. }
  147. info[ik] = iv
  148. }
  149. saveArray = append(saveArray, info)
  150. if len(saveArray) == Mgo_BulkSize {
  151. MyMgo.SaveBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace, &saveArray)
  152. }
  153. })
  154. }(k, v)
  155. index++
  156. if index%500 == 0 {
  157. logger.Info("第", *number, "次合并数据:", index)
  158. }
  159. }
  160. mergeWait.Wait()
  161. if len(saveArray) > 0 {
  162. MyMgo.SaveBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace, &saveArray)
  163. }
  164. if len(newDelArray) > 0 {
  165. MyMgo.DelBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace, &newDelArray)
  166. }
  167. if len(oldDelArray) > 0 {
  168. MyMgo.DelBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace_temp, &oldDelArray)
  169. }
  170. logger.Info("第", *number, "次合并数据结束。。。", index)
  171. }