|
@@ -26,12 +26,18 @@ type Move struct {
|
|
|
}
|
|
|
|
|
|
type pushJob struct {
|
|
|
- taskType int
|
|
|
- pool chan bool
|
|
|
- wait *sync.WaitGroup
|
|
|
- lock *sync.Mutex
|
|
|
- lastId string
|
|
|
- users *[]map[string]interface{}
|
|
|
+ taskType int
|
|
|
+ pool chan bool
|
|
|
+ wait *sync.WaitGroup
|
|
|
+ lock *sync.Mutex
|
|
|
+ moveLock *sync.Mutex
|
|
|
+ moveWait *sync.WaitGroup
|
|
|
+ movePool chan bool
|
|
|
+ mergeLock *sync.Mutex
|
|
|
+ mergeWait *sync.WaitGroup
|
|
|
+ mergePool chan bool
|
|
|
+ lastId string
|
|
|
+ users *[]map[string]interface{}
|
|
|
}
|
|
|
|
|
|
//taskType 1--实时推送 2--实时推送+一天三次的8点推送 3--一天三次推送 4--九点推送
|
|
@@ -46,6 +52,7 @@ func (p *pushJob) Execute(taskType int, isMoveDatas bool) {
|
|
|
p.Push()
|
|
|
}
|
|
|
func (p *pushJob) Move() {
|
|
|
+ defer util.Catch()
|
|
|
logger.Info("推送任务", p.taskType, "开始迁移数据。。。")
|
|
|
nowUnix := time.Now().Unix()
|
|
|
sess := mongodb.GetMgoConn()
|
|
@@ -58,53 +65,65 @@ func (p *pushJob) Move() {
|
|
|
moves := map[string]*Move{}
|
|
|
logger.Info("推送任务", p.taskType, "开始遍历pushspace_temp")
|
|
|
index, number, length := 0, 0, 0
|
|
|
- for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
- userId := util.ObjToString(temp["userid"])
|
|
|
- move := moves[userId]
|
|
|
- if move != nil {
|
|
|
- list, _ := move.Info["list"].(SortList)
|
|
|
- idMap := map[string]bool{}
|
|
|
- for _, v := range list {
|
|
|
- idMap[util.ObjToString((*v.Info)["_id"])] = true
|
|
|
- }
|
|
|
- newList := putil.ToSortList(temp["list"])
|
|
|
- for _, v := range newList {
|
|
|
- if idMap[util.ObjToString((*v.Info)["_id"])] {
|
|
|
- continue
|
|
|
+ for data := make(map[string]interface{}); it.Next(&data); {
|
|
|
+ p.movePool <- true
|
|
|
+ p.moveWait.Add(1)
|
|
|
+ index++
|
|
|
+ go func(temp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-p.movePool
|
|
|
+ p.moveWait.Done()
|
|
|
+ p.moveLock.Unlock()
|
|
|
+ }()
|
|
|
+ userId := util.ObjToString(temp["userid"])
|
|
|
+ p.moveLock.Lock()
|
|
|
+ move := moves[userId]
|
|
|
+ if move != nil {
|
|
|
+ list, _ := move.Info["list"].(SortList)
|
|
|
+ idMap := map[string]bool{}
|
|
|
+ for _, v := range list {
|
|
|
+ idMap[util.ObjToString((*v.Info)["_id"])] = true
|
|
|
+ }
|
|
|
+ newList := putil.ToSortList(temp["list"])
|
|
|
+ for _, v := range newList {
|
|
|
+ if idMap[util.ObjToString((*v.Info)["_id"])] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ list = append(list, v)
|
|
|
+ }
|
|
|
+ sort.Sort(list)
|
|
|
+ if len(list) > SysConfig.MaxPushSize {
|
|
|
+ list = list[:SysConfig.MaxPushSize]
|
|
|
+ }
|
|
|
+ temp["list"] = list
|
|
|
+ move.Info = temp
|
|
|
+ move.Ids = append(move.Ids, temp["_id"])
|
|
|
+ } else {
|
|
|
+ temp["list"] = putil.ToSortList(temp["list"])
|
|
|
+ move = &Move{
|
|
|
+ Info: temp,
|
|
|
+ Ids: []interface{}{temp["_id"]},
|
|
|
}
|
|
|
- list = append(list, v)
|
|
|
- }
|
|
|
- sort.Sort(list)
|
|
|
- if len(list) > SysConfig.MaxPushSize {
|
|
|
- list = list[:SysConfig.MaxPushSize]
|
|
|
}
|
|
|
- temp["list"] = list
|
|
|
- move.Info = temp
|
|
|
- move.Ids = append(move.Ids, temp["_id"])
|
|
|
- } else {
|
|
|
- temp["list"] = putil.ToSortList(temp["list"])
|
|
|
- move = &Move{
|
|
|
- Info: temp,
|
|
|
- Ids: []interface{}{temp["_id"]},
|
|
|
+ moves[userId] = move
|
|
|
+ length++
|
|
|
+ if length == SysConfig.MoveBatch {
|
|
|
+ p.Merge(&number, nowUnix, moves)
|
|
|
+ length = 0
|
|
|
+ moves = map[string]*Move{}
|
|
|
}
|
|
|
- }
|
|
|
- moves[userId] = move
|
|
|
- temp = make(map[string]interface{})
|
|
|
- index++
|
|
|
- length++
|
|
|
+ }(data)
|
|
|
+ data = make(map[string]interface{})
|
|
|
if index%500 == 0 {
|
|
|
logger.Info("推送任务", p.taskType, "pushspace_temp加载到内存:", index)
|
|
|
}
|
|
|
- if length == SysConfig.MoveBatch {
|
|
|
- length = 0
|
|
|
- p.Merge(&number, nowUnix, moves)
|
|
|
- moves = map[string]*Move{}
|
|
|
- }
|
|
|
}
|
|
|
+ p.moveWait.Wait()
|
|
|
if length > 0 {
|
|
|
p.Merge(&number, nowUnix, moves)
|
|
|
+ length = 0
|
|
|
+ moves = map[string]*Move{}
|
|
|
}
|
|
|
- moves = nil
|
|
|
logger.Info("推送任务", p.taskType, "迁移数据结束。。。", index)
|
|
|
}
|
|
|
func (p *pushJob) Merge(number *int, nowUnix int64, moves map[string]*Move) {
|
|
@@ -116,81 +135,96 @@ func (p *pushJob) Merge(number *int, nowUnix int64, moves map[string]*Move) {
|
|
|
updateArray_query := []map[string]interface{}{}
|
|
|
updateArray_set := []map[string]interface{}{}
|
|
|
updateArray_delete := []interface{}{}
|
|
|
- sess := mongodb.GetMgoConn()
|
|
|
- defer mongodb.DestoryMongoConn(sess)
|
|
|
- for userId, move := range moves {
|
|
|
- var data map[string]interface{}
|
|
|
- sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
|
|
|
- if data == nil { //批量新增
|
|
|
- saveArray = append(saveArray, move.Info)
|
|
|
- saveArray_delete = append(saveArray_delete, move.Ids...)
|
|
|
- if len(saveArray) == BulkSize {
|
|
|
- p.SaveBulk(sess, &saveArray, &saveArray_delete)
|
|
|
- }
|
|
|
- } else { //批量更新
|
|
|
- setMap := map[string]interface{}{}
|
|
|
- for _, field := range MoveFields {
|
|
|
- if move.Info[field] == nil {
|
|
|
- continue
|
|
|
+ for u, m := range moves {
|
|
|
+ p.mergePool <- true
|
|
|
+ p.mergeWait.Add(1)
|
|
|
+ go func(userId string, move *Move) {
|
|
|
+ defer func() {
|
|
|
+ <-p.mergePool
|
|
|
+ p.mergeWait.Done()
|
|
|
+ }()
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
+ var data map[string]interface{}
|
|
|
+ sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
|
|
|
+ if data == nil { //批量新增
|
|
|
+ p.mergeLock.Lock()
|
|
|
+ saveArray = append(saveArray, move.Info)
|
|
|
+ saveArray_delete = append(saveArray_delete, move.Ids...)
|
|
|
+ if len(saveArray) == BulkSize {
|
|
|
+ p.SaveBulk(sess, &saveArray, &saveArray_delete)
|
|
|
}
|
|
|
- setMap[field] = move.Info[field]
|
|
|
- }
|
|
|
- //
|
|
|
- newListOrig, _ := move.Info["list"].(SortList)
|
|
|
- if newListOrig == nil || len(newListOrig) == 0 {
|
|
|
- continue
|
|
|
- }
|
|
|
- pushAll := make(map[string]interface{})
|
|
|
- for _, v := range []string{"", "temp"} {
|
|
|
- oldList := putil.ToSortList(data[v+"list"])
|
|
|
- if v == "temp" && oldList == nil {
|
|
|
- continue
|
|
|
+ p.mergeLock.Unlock()
|
|
|
+ } else { //批量更新
|
|
|
+ setMap := map[string]interface{}{}
|
|
|
+ for _, field := range MoveFields {
|
|
|
+ if move.Info[field] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ setMap[field] = move.Info[field]
|
|
|
}
|
|
|
- idMap := map[string]bool{}
|
|
|
- for _, vv := range oldList {
|
|
|
- idMap[util.ObjToString((*vv.Info)["_id"])] = true
|
|
|
+ //
|
|
|
+ newListOrig, _ := move.Info["list"].(SortList)
|
|
|
+ if newListOrig == nil || len(newListOrig) == 0 {
|
|
|
+ return
|
|
|
}
|
|
|
- newList := make(SortList, 0)
|
|
|
- //去重
|
|
|
- for _, vv := range newListOrig {
|
|
|
- if idMap[util.ObjToString((*vv.Info)["_id"])] {
|
|
|
+ pushAll := make(map[string]interface{})
|
|
|
+ for _, v := range []string{"", "temp"} {
|
|
|
+ oldList := putil.ToSortList(data[v+"list"])
|
|
|
+ if v == "temp" && oldList == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ idMap := map[string]bool{}
|
|
|
+ for _, vv := range oldList {
|
|
|
+ idMap[util.ObjToString((*vv.Info)["_id"])] = true
|
|
|
+ }
|
|
|
+ newList := make(SortList, 0)
|
|
|
+ //去重
|
|
|
+ for _, vv := range newListOrig {
|
|
|
+ if idMap[util.ObjToString((*vv.Info)["_id"])] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ newList = append(newList, vv)
|
|
|
+ }
|
|
|
+ pLength := len(newList)
|
|
|
+ if pLength == 0 {
|
|
|
continue
|
|
|
}
|
|
|
- newList = append(newList, vv)
|
|
|
+ rLength := len(oldList)
|
|
|
+ if rLength+pLength > SysConfig.MaxPushSize {
|
|
|
+ newList = append(newList, oldList...)
|
|
|
+ sort.Sort(newList)
|
|
|
+ setMap[v+"list"] = newList[:SysConfig.MaxPushSize]
|
|
|
+ setMap[v+"size"] = SysConfig.MaxPushSize
|
|
|
+ } else { //追加
|
|
|
+ setMap[v+"size"] = rLength + pLength
|
|
|
+ pushAll[v+"list"] = newList
|
|
|
+ }
|
|
|
}
|
|
|
- pLength := len(newList)
|
|
|
- if pLength == 0 {
|
|
|
- continue
|
|
|
+ upSet := map[string]interface{}{
|
|
|
+ "$set": setMap,
|
|
|
}
|
|
|
- rLength := len(oldList)
|
|
|
- if rLength+pLength > SysConfig.MaxPushSize {
|
|
|
- newList = append(newList, oldList...)
|
|
|
- sort.Sort(newList)
|
|
|
- setMap[v+"list"] = newList[:SysConfig.MaxPushSize]
|
|
|
- setMap[v+"size"] = SysConfig.MaxPushSize
|
|
|
- } else { //追加
|
|
|
- setMap[v+"size"] = rLength + pLength
|
|
|
- pushAll[v+"list"] = newList
|
|
|
+ if len(pushAll) > 0 {
|
|
|
+ upSet["$pushAll"] = pushAll
|
|
|
}
|
|
|
+ p.mergeLock.Lock()
|
|
|
+ updateArray_delete = append(updateArray_delete, move.Ids...)
|
|
|
+ updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
|
|
|
+ updateArray_set = append(updateArray_set, upSet)
|
|
|
+ if len(updateArray_query) == BulkSize {
|
|
|
+ p.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
|
|
|
+ }
|
|
|
+ p.mergeLock.Unlock()
|
|
|
}
|
|
|
- upSet := map[string]interface{}{
|
|
|
- "$set": setMap,
|
|
|
- }
|
|
|
- if len(pushAll) > 0 {
|
|
|
- upSet["$pushAll"] = pushAll
|
|
|
- }
|
|
|
- updateArray_delete = append(updateArray_delete, move.Ids...)
|
|
|
- updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
|
|
|
- updateArray_set = append(updateArray_set, upSet)
|
|
|
- if len(updateArray_query) == BulkSize {
|
|
|
- p.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
|
|
|
- }
|
|
|
- }
|
|
|
+ }(u, m)
|
|
|
index++
|
|
|
if index%500 == 0 {
|
|
|
logger.Info("推送任务", p.taskType, "第", *number, "次合并数据:", index)
|
|
|
}
|
|
|
}
|
|
|
+ p.mergeWait.Wait()
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
if len(saveArray) > 0 {
|
|
|
p.SaveBulk(sess, &saveArray, &saveArray_delete)
|
|
|
}
|
|
@@ -255,6 +289,7 @@ func (p *pushJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
|
|
|
*array = []interface{}{}
|
|
|
}
|
|
|
func (p *pushJob) Push() {
|
|
|
+ defer util.Catch()
|
|
|
logger.Info("推送任务", p.taskType, "开始推送。。。")
|
|
|
batch_index := 0
|
|
|
for {
|