123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- //数据迁移
- package mover
- import (
- "sync"
- "time"
- util "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/mongodb"
- . "bp.jydev.jianyu360.cn/BaseService/pushpkg/db"
- . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
- "app.yhyue.com/moapp/jybase/logger"
- )
- type Mover interface {
- MaxPushSize(temp map[string]interface{}) int
- MergeList(o, n interface{}, maxPushSize int) interface{}
- ToList(list interface{}) interface{}
- Unique(temp map[string]interface{}) string
- Query(unique string) map[string]interface{}
- SortListSplit(list interface{}, f func(v interface{}))
- }
- type MoveUser struct {
- info map[string]interface{}
- ids []interface{}
- maxPushSize int
- }
- type Move struct {
- Mgo_Log *MongodbSim
- Pushspace_temp string
- Mgo_Log_DbName string
- Pushspace string
- MoveBatch int
- MergePoolSize int
- SortFields []string
- TestQuery map[string]interface{}
- Mover Mover
- }
- func (m *Move) Execute() int {
- defer util.Catch()
- nowUnix := time.Now().Unix()
- sess := m.Mgo_Log.GetMgoConn()
- defer m.Mgo_Log.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "timestamp": map[string]interface{}{
- "$lt": nowUnix,
- },
- }
- if m.TestQuery != nil {
- for k, v := range m.TestQuery {
- query[k] = v
- }
- }
- logger.Info("迁移数据query", query)
- it := sess.DB(m.Mgo_Log_DbName).C(m.Pushspace_temp).Find(query).Sort(m.SortFields...).Iter()
- moveUsers := map[string]*MoveUser{}
- index, number, length := 0, 0, 0
- for temp := make(map[string]interface{}); it.Next(&temp); {
- index++
- unique := m.Mover.Unique(temp)
- maxPushSize := util.IntAll(temp["maxpushsize"])
- if maxPushSize <= 0 {
- maxPushSize = m.Mover.MaxPushSize(temp)
- }
- moveUser := moveUsers[unique]
- if moveUser != nil {
- temp["list"] = m.Mover.MergeList(moveUser.info["list"], temp["list"], maxPushSize)
- moveUser.info = temp
- moveUser.maxPushSize = maxPushSize
- moveUser.ids = append(moveUser.ids, temp["_id"])
- } else {
- temp["list"] = m.Mover.ToList(temp["list"])
- moveUser = &MoveUser{
- info: temp,
- ids: []interface{}{temp["_id"]},
- maxPushSize: maxPushSize,
- }
- }
- moveUsers[unique] = moveUser
- length++
- if length == m.MoveBatch {
- m.merge(&number, nowUnix, moveUsers)
- length = 0
- moveUsers = map[string]*MoveUser{}
- }
- temp = make(map[string]interface{})
- if index%500 == 0 {
- logger.Info("迁移数据:", index)
- }
- }
- if length > 0 {
- m.merge(&number, nowUnix, moveUsers)
- length = 0
- moveUsers = map[string]*MoveUser{}
- }
- return index
- }
- func (m *Move) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUser) {
- *number++
- logger.Info("第", *number, "次开始合并数据")
- index := 0
- saveArray := []map[string]interface{}{}
- newDelArray, oldDelArray := []interface{}{}, []interface{}{}
- //
- mergeLock := &sync.Mutex{}
- mergeWait := &sync.WaitGroup{}
- mergePool := make(chan bool, m.MergePoolSize)
- for k, v := range moveUsers {
- mergePool <- true
- mergeWait.Add(1)
- go func(unique string, moveUser *MoveUser) {
- defer util.Catch()
- defer func() {
- <-mergePool
- mergeWait.Done()
- }()
- sess := m.Mgo_Log.GetMgoConn()
- defer m.Mgo_Log.DestoryMongoConn(sess)
- var datas []map[string]interface{}
- sess.DB(m.Mgo_Log_DbName).C(m.Pushspace).Find(m.Mover.Query(unique)).Select(map[string]interface{}{"_id": 1, "list": 1}).All(&datas)
- mergeLock.Lock()
- defer mergeLock.Unlock()
- var newList interface{}
- if datas == nil { //批量新增
- newList = moveUser.info["list"]
- } else {
- array := []interface{}{}
- for _, v := range datas {
- oldList, _ := v["list"].([]interface{})
- array = append(array, oldList...)
- newDelArray = append(newDelArray, v["_id"])
- if len(newDelArray) >= Mgo_BulkSize {
- MyMgo.DelBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace, &newDelArray)
- }
- }
- newList = m.Mover.MergeList(m.Mover.ToList(array), moveUser.info["list"], moveUser.maxPushSize)
- }
- oldDelArray = append(oldDelArray, moveUser.ids...)
- if len(oldDelArray) >= Mgo_BulkSize {
- MyMgo.DelBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace_temp, &oldDelArray)
- }
- m.Mover.SortListSplit(newList, func(v interface{}) {
- info := map[string]interface{}{
- "list": v,
- }
- for ik, iv := range moveUser.info {
- if ik == "_id" || ik == "list" {
- continue
- }
- info[ik] = iv
- }
- saveArray = append(saveArray, info)
- if len(saveArray) == Mgo_BulkSize {
- MyMgo.SaveBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace, &saveArray)
- }
- })
- }(k, v)
- index++
- if index%500 == 0 {
- logger.Info("第", *number, "次合并数据:", index)
- }
- }
- mergeWait.Wait()
- if len(saveArray) > 0 {
- MyMgo.SaveBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace, &saveArray)
- }
- if len(newDelArray) > 0 {
- MyMgo.DelBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace, &newDelArray)
- }
- if len(oldDelArray) > 0 {
- MyMgo.DelBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace_temp, &oldDelArray)
- }
- logger.Info("第", *number, "次合并数据结束。。。", index)
- }
|