|
@@ -14,10 +14,13 @@ import (
|
|
mgo "gopkg.in/mgo.v2"
|
|
mgo "gopkg.in/mgo.v2"
|
|
)
|
|
)
|
|
|
|
|
|
-type moveJob struct {
|
|
|
|
|
|
+type MoveUser struct {
|
|
info map[string]interface{}
|
|
info map[string]interface{}
|
|
ids []interface{}
|
|
ids []interface{}
|
|
isVipUser bool
|
|
isVipUser bool
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type MoveJob struct {
|
|
moveLock *sync.Mutex
|
|
moveLock *sync.Mutex
|
|
moveWait *sync.WaitGroup
|
|
moveWait *sync.WaitGroup
|
|
movePool chan bool
|
|
movePool chan bool
|
|
@@ -26,7 +29,7 @@ type moveJob struct {
|
|
mergePool chan bool
|
|
mergePool chan bool
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *moveJob) Execute() {
|
|
|
|
|
|
+func (m *MoveJob) Execute() {
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
Jobs.Push.lock.Lock()
|
|
Jobs.Push.lock.Lock()
|
|
defer Jobs.Push.lock.Unlock()
|
|
defer Jobs.Push.lock.Unlock()
|
|
@@ -39,7 +42,7 @@ func (m *moveJob) Execute() {
|
|
"$lt": nowUnix,
|
|
"$lt": nowUnix,
|
|
},
|
|
},
|
|
}).Sort("userid").Iter()
|
|
}).Sort("userid").Iter()
|
|
- moves := map[string]*moveJob{}
|
|
|
|
|
|
+ moveUsers := map[string]*MoveUser{}
|
|
logger.Info("开始遍历pushspace_temp")
|
|
logger.Info("开始遍历pushspace_temp")
|
|
index, number, length := 0, 0, 0
|
|
index, number, length := 0, 0, 0
|
|
for data := make(map[string]interface{}); it.Next(&data); {
|
|
for data := make(map[string]interface{}); it.Next(&data); {
|
|
@@ -59,9 +62,9 @@ func (m *moveJob) Execute() {
|
|
maxPushSize = Config.VipMaxPushSize
|
|
maxPushSize = Config.VipMaxPushSize
|
|
}
|
|
}
|
|
m.moveLock.Lock()
|
|
m.moveLock.Lock()
|
|
- move := moves[userId]
|
|
|
|
- if move != nil {
|
|
|
|
- list, _ := move.info["list"].(SortList)
|
|
|
|
|
|
+ moveUser := moveUsers[userId]
|
|
|
|
+ if moveUser != nil {
|
|
|
|
+ list, _ := moveUser.info["list"].(SortList)
|
|
idMap := map[string]bool{}
|
|
idMap := map[string]bool{}
|
|
for _, v := range list {
|
|
for _, v := range list {
|
|
idMap[util.ObjToString((*v.Info)["_id"])] = true
|
|
idMap[util.ObjToString((*v.Info)["_id"])] = true
|
|
@@ -78,23 +81,23 @@ func (m *moveJob) Execute() {
|
|
list = list[:maxPushSize]
|
|
list = list[:maxPushSize]
|
|
}
|
|
}
|
|
temp["list"] = list
|
|
temp["list"] = list
|
|
- move.info = temp
|
|
|
|
- move.isVipUser = isVipUser
|
|
|
|
- move.ids = append(move.ids, temp["_id"])
|
|
|
|
|
|
+ moveUser.info = temp
|
|
|
|
+ moveUser.isVipUser = isVipUser
|
|
|
|
+ moveUser.ids = append(moveUser.ids, temp["_id"])
|
|
} else {
|
|
} else {
|
|
temp["list"] = putil.ToSortList(temp["list"])
|
|
temp["list"] = putil.ToSortList(temp["list"])
|
|
- move = &moveJob{
|
|
|
|
|
|
+ moveUser = &MoveUser{
|
|
info: temp,
|
|
info: temp,
|
|
ids: []interface{}{temp["_id"]},
|
|
ids: []interface{}{temp["_id"]},
|
|
isVipUser: isVipUser,
|
|
isVipUser: isVipUser,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- moves[userId] = move
|
|
|
|
|
|
+ moveUsers[userId] = moveUser
|
|
length++
|
|
length++
|
|
if length == Config.MoveBatch {
|
|
if length == Config.MoveBatch {
|
|
- m.merge(&number, nowUnix, moves)
|
|
|
|
|
|
+ m.merge(&number, nowUnix, moveUsers)
|
|
length = 0
|
|
length = 0
|
|
- moves = map[string]*moveJob{}
|
|
|
|
|
|
+ moveUsers = map[string]*MoveUser{}
|
|
}
|
|
}
|
|
}(data)
|
|
}(data)
|
|
data = make(map[string]interface{})
|
|
data = make(map[string]interface{})
|
|
@@ -104,13 +107,13 @@ func (m *moveJob) Execute() {
|
|
}
|
|
}
|
|
m.moveWait.Wait()
|
|
m.moveWait.Wait()
|
|
if length > 0 {
|
|
if length > 0 {
|
|
- m.merge(&number, nowUnix, moves)
|
|
|
|
|
|
+ m.merge(&number, nowUnix, moveUsers)
|
|
length = 0
|
|
length = 0
|
|
- moves = map[string]*moveJob{}
|
|
|
|
|
|
+ moveUsers = map[string]*MoveUser{}
|
|
}
|
|
}
|
|
logger.Info("迁移数据结束。。。", index)
|
|
logger.Info("迁移数据结束。。。", index)
|
|
}
|
|
}
|
|
-func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
|
|
|
|
|
|
+func (m *MoveJob) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUser) {
|
|
*number++
|
|
*number++
|
|
logger.Info("第", *number, "次开始合并数据")
|
|
logger.Info("第", *number, "次开始合并数据")
|
|
index := 0
|
|
index := 0
|
|
@@ -119,10 +122,10 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
|
|
updateArray_query := []map[string]interface{}{}
|
|
updateArray_query := []map[string]interface{}{}
|
|
updateArray_set := []map[string]interface{}{}
|
|
updateArray_set := []map[string]interface{}{}
|
|
updateArray_delete := []interface{}{}
|
|
updateArray_delete := []interface{}{}
|
|
- for u, m := range moves {
|
|
|
|
|
|
+ for k, v := range moveUsers {
|
|
m.mergePool <- true
|
|
m.mergePool <- true
|
|
m.mergeWait.Add(1)
|
|
m.mergeWait.Add(1)
|
|
- go func(userId string, move *moveJob) {
|
|
|
|
|
|
+ go func(userId string, moveUser *MoveUser) {
|
|
defer func() {
|
|
defer func() {
|
|
<-m.mergePool
|
|
<-m.mergePool
|
|
m.mergeWait.Done()
|
|
m.mergeWait.Done()
|
|
@@ -130,29 +133,25 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
|
|
sess := mongodb.GetMgoConn()
|
|
sess := mongodb.GetMgoConn()
|
|
defer mongodb.DestoryMongoConn(sess)
|
|
defer mongodb.DestoryMongoConn(sess)
|
|
var data map[string]interface{}
|
|
var data map[string]interface{}
|
|
- err := sess.DB(putil.DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1}).One(&data)
|
|
|
|
- if err != nil {
|
|
|
|
- logger.Error(userId, "获取用户pushspace表中出数据出错", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ sess.DB(putil.DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1}).One(&data)
|
|
if data == nil { //批量新增
|
|
if data == nil { //批量新增
|
|
m.mergeLock.Lock()
|
|
m.mergeLock.Lock()
|
|
- saveArray = append(saveArray, move.info)
|
|
|
|
- saveArray_delete = append(saveArray_delete, move.ids...)
|
|
|
|
|
|
+ saveArray = append(saveArray, moveUser.info)
|
|
|
|
+ saveArray_delete = append(saveArray_delete, moveUser.ids...)
|
|
if len(saveArray) == putil.BulkSize {
|
|
if len(saveArray) == putil.BulkSize {
|
|
- m.saveBulk(sess, &saveArray, &saveArray_delete)
|
|
|
|
|
|
+ m.saveBulk(&saveArray, &saveArray_delete)
|
|
}
|
|
}
|
|
m.mergeLock.Unlock()
|
|
m.mergeLock.Unlock()
|
|
} else { //批量更新
|
|
} else { //批量更新
|
|
setMap := map[string]interface{}{}
|
|
setMap := map[string]interface{}{}
|
|
for _, field := range MoveFields {
|
|
for _, field := range MoveFields {
|
|
- if move.info[field] == nil {
|
|
|
|
|
|
+ if moveUser.info[field] == nil {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- setMap[field] = move.info[field]
|
|
|
|
|
|
+ setMap[field] = moveUser.info[field]
|
|
}
|
|
}
|
|
//
|
|
//
|
|
- newListOrig, _ := move.info["list"].(SortList)
|
|
|
|
|
|
+ newListOrig, _ := moveUser.info["list"].(SortList)
|
|
if newListOrig == nil || len(newListOrig) == 0 {
|
|
if newListOrig == nil || len(newListOrig) == 0 {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -175,7 +174,7 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
|
|
}
|
|
}
|
|
rLength := len(oldList)
|
|
rLength := len(oldList)
|
|
maxPushSize := Config.MaxPushSize
|
|
maxPushSize := Config.MaxPushSize
|
|
- if move.isVipUser {
|
|
|
|
|
|
+ if moveUser.isVipUser {
|
|
maxPushSize = Config.VipMaxPushSize
|
|
maxPushSize = Config.VipMaxPushSize
|
|
}
|
|
}
|
|
upSet := map[string]interface{}{}
|
|
upSet := map[string]interface{}{}
|
|
@@ -192,32 +191,32 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
|
|
}
|
|
}
|
|
upSet["$set"] = setMap
|
|
upSet["$set"] = setMap
|
|
m.mergeLock.Lock()
|
|
m.mergeLock.Lock()
|
|
- updateArray_delete = append(updateArray_delete, move.ids...)
|
|
|
|
|
|
+ updateArray_delete = append(updateArray_delete, moveUser.ids...)
|
|
updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
|
|
updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
|
|
updateArray_set = append(updateArray_set, upSet)
|
|
updateArray_set = append(updateArray_set, upSet)
|
|
if len(updateArray_query) == putil.BulkSize {
|
|
if len(updateArray_query) == putil.BulkSize {
|
|
- m.updateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
|
|
|
|
|
|
+ m.updateBulk(&updateArray_query, &updateArray_set, &updateArray_delete)
|
|
}
|
|
}
|
|
m.mergeLock.Unlock()
|
|
m.mergeLock.Unlock()
|
|
}
|
|
}
|
|
- }(u, m)
|
|
|
|
|
|
+ }(k, v)
|
|
index++
|
|
index++
|
|
if index%500 == 0 {
|
|
if index%500 == 0 {
|
|
logger.Info("第", *number, "次合并数据:", index)
|
|
logger.Info("第", *number, "次合并数据:", index)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
m.mergeWait.Wait()
|
|
m.mergeWait.Wait()
|
|
- sess := mongodb.GetMgoConn()
|
|
|
|
- defer mongodb.DestoryMongoConn(sess)
|
|
|
|
if len(saveArray) > 0 {
|
|
if len(saveArray) > 0 {
|
|
- m.saveBulk(sess, &saveArray, &saveArray_delete)
|
|
|
|
|
|
+ m.saveBulk(&saveArray, &saveArray_delete)
|
|
}
|
|
}
|
|
if len(updateArray_query) > 0 {
|
|
if len(updateArray_query) > 0 {
|
|
- m.updateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
|
|
|
|
|
|
+ m.updateBulk(&updateArray_query, &updateArray_set, &updateArray_delete)
|
|
}
|
|
}
|
|
logger.Info("第", *number, "次合并数据结束。。。", index)
|
|
logger.Info("第", *number, "次合并数据结束。。。", index)
|
|
}
|
|
}
|
|
-func (m *moveJob) saveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
|
|
|
|
|
|
+func (m *MoveJob) saveBulk(saves *[]map[string]interface{}, deletes *[]interface{}) {
|
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
coll := sess.DB(putil.DbName).C("pushspace")
|
|
coll := sess.DB(putil.DbName).C("pushspace")
|
|
bulk := coll.Bulk()
|
|
bulk := coll.Bulk()
|
|
for _, v := range *saves {
|
|
for _, v := range *saves {
|
|
@@ -231,7 +230,9 @@ func (m *moveJob) saveBulk(sess *mgo.Session, saves *[]map[string]interface{}, d
|
|
}
|
|
}
|
|
*saves = []map[string]interface{}{}
|
|
*saves = []map[string]interface{}{}
|
|
}
|
|
}
|
|
-func (m *moveJob) updateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
|
|
|
|
|
|
+func (m *MoveJob) updateBulk(array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
|
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
coll := sess.DB(putil.DbName).C("pushspace")
|
|
coll := sess.DB(putil.DbName).C("pushspace")
|
|
bulk := coll.Bulk()
|
|
bulk := coll.Bulk()
|
|
for k, v := range *array_q {
|
|
for k, v := range *array_q {
|
|
@@ -246,7 +247,7 @@ func (m *moveJob) updateBulk(sess *mgo.Session, array_q, array_s *[]map[string]i
|
|
*array_q = []map[string]interface{}{}
|
|
*array_q = []map[string]interface{}{}
|
|
*array_s = []map[string]interface{}{}
|
|
*array_s = []map[string]interface{}{}
|
|
}
|
|
}
|
|
-func (m *moveJob) delBulk(sess *mgo.Session, array *[]interface{}) {
|
|
|
|
|
|
+func (m *MoveJob) delBulk(sess *mgo.Session, array *[]interface{}) {
|
|
coll := sess.DB(putil.DbName).C("pushspace_temp")
|
|
coll := sess.DB(putil.DbName).C("pushspace_temp")
|
|
count := 0
|
|
count := 0
|
|
bulk := coll.Bulk()
|
|
bulk := coll.Bulk()
|