//数据迁移 package mover import ( "sync" "time" util "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/mongodb" . "bp.jydev.jianyu360.cn/BaseService/pushpkg/db" . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p" "app.yhyue.com/moapp/jybase/logger" ) type Mover interface { MaxPushSize(temp map[string]interface{}) int MergeList(o, n interface{}, maxPushSize int) interface{} ToList(list interface{}) interface{} Unique(temp map[string]interface{}) string Query(unique string) map[string]interface{} SortListSplit(list interface{}, f func(v interface{})) } type MoveUser struct { info map[string]interface{} ids []interface{} maxPushSize int } type Move struct { Mgo_Log *MongodbSim Pushspace_temp string Mgo_Log_DbName string Pushspace string MoveBatch int MergePoolSize int SortFields []string TestQuery map[string]interface{} Mover Mover } func (m *Move) Execute() int { defer util.Catch() nowUnix := time.Now().Unix() sess := m.Mgo_Log.GetMgoConn() defer m.Mgo_Log.DestoryMongoConn(sess) query := map[string]interface{}{ "timestamp": map[string]interface{}{ "$lt": nowUnix, }, } if m.TestQuery != nil { for k, v := range m.TestQuery { query[k] = v } } logger.Info("迁移数据query", query) it := sess.DB(m.Mgo_Log_DbName).C(m.Pushspace_temp).Find(query).Sort(m.SortFields...).Iter() moveUsers := map[string]*MoveUser{} index, number, length := 0, 0, 0 for temp := make(map[string]interface{}); it.Next(&temp); { index++ unique := m.Mover.Unique(temp) maxPushSize := util.IntAll(temp["maxpushsize"]) if maxPushSize <= 0 { maxPushSize = m.Mover.MaxPushSize(temp) } moveUser := moveUsers[unique] if moveUser != nil { temp["list"] = m.Mover.MergeList(moveUser.info["list"], temp["list"], maxPushSize) moveUser.info = temp moveUser.maxPushSize = maxPushSize moveUser.ids = append(moveUser.ids, temp["_id"]) } else { temp["list"] = m.Mover.ToList(temp["list"]) moveUser = &MoveUser{ info: temp, ids: []interface{}{temp["_id"]}, maxPushSize: maxPushSize, } } moveUsers[unique] = moveUser length++ if length == m.MoveBatch { m.merge(&number, nowUnix, moveUsers) length = 0 moveUsers = map[string]*MoveUser{} } temp = make(map[string]interface{}) if index%500 == 0 { logger.Info("迁移数据:", index) } } if length > 0 { m.merge(&number, nowUnix, moveUsers) length = 0 moveUsers = map[string]*MoveUser{} } return index } func (m *Move) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUser) { *number++ logger.Info("第", *number, "次开始合并数据") index := 0 saveArray := []map[string]interface{}{} newDelArray, oldDelArray := []interface{}{}, []interface{}{} // mergeLock := &sync.Mutex{} mergeWait := &sync.WaitGroup{} mergePool := make(chan bool, m.MergePoolSize) for k, v := range moveUsers { mergePool <- true mergeWait.Add(1) go func(unique string, moveUser *MoveUser) { defer util.Catch() defer func() { <-mergePool mergeWait.Done() }() sess := m.Mgo_Log.GetMgoConn() defer m.Mgo_Log.DestoryMongoConn(sess) var datas []map[string]interface{} sess.DB(m.Mgo_Log_DbName).C(m.Pushspace).Find(m.Mover.Query(unique)).Select(map[string]interface{}{"_id": 1, "list": 1}).All(&datas) mergeLock.Lock() defer mergeLock.Unlock() var newList interface{} if datas == nil { //批量新增 newList = moveUser.info["list"] } else { array := []interface{}{} for _, v := range datas { oldList, _ := v["list"].([]interface{}) array = append(array, oldList...) newDelArray = append(newDelArray, v["_id"]) if len(newDelArray) >= Mgo_BulkSize { MyMgo.DelBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace, &newDelArray) } } newList = m.Mover.MergeList(m.Mover.ToList(array), moveUser.info["list"], moveUser.maxPushSize) } oldDelArray = append(oldDelArray, moveUser.ids...) if len(oldDelArray) >= Mgo_BulkSize { MyMgo.DelBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace_temp, &oldDelArray) } m.Mover.SortListSplit(newList, func(v interface{}) { info := map[string]interface{}{ "list": v, } for ik, iv := range moveUser.info { if ik == "_id" || ik == "list" { continue } info[ik] = iv } saveArray = append(saveArray, info) if len(saveArray) == Mgo_BulkSize { MyMgo.SaveBulk(m.Mgo_Log, sess, m.Mgo_Log_DbName, m.Pushspace, &saveArray) } }) }(k, v) index++ if index%500 == 0 { logger.Info("第", *number, "次合并数据:", index) } } mergeWait.Wait() if len(saveArray) > 0 { MyMgo.SaveBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace, &saveArray) } if len(newDelArray) > 0 { MyMgo.DelBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace, &newDelArray) } if len(oldDelArray) > 0 { MyMgo.DelBulk(m.Mgo_Log, nil, m.Mgo_Log_DbName, m.Pushspace_temp, &oldDelArray) } logger.Info("第", *number, "次合并数据结束。。。", index) }