|
@@ -2,189 +2,199 @@
|
|
|
package job
|
|
|
|
|
|
import (
|
|
|
+ "fmt"
|
|
|
. "public"
|
|
|
. "push/config"
|
|
|
putil "push/util"
|
|
|
. "qfw/mongodb"
|
|
|
"qfw/util"
|
|
|
+ "qfw/util/elastic"
|
|
|
"qfw/util/redis"
|
|
|
"sort"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"github.com/donnie4w/go-logger/logger"
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
+const (
|
|
|
+ projectQuery = `{"query":{"filtered":{"filter":{"bool":{"must":[{"range":{"pici":{"gt":%d,"lte":%d}}}]}}}}%s}`
|
|
|
+ projectPagingQuery = `,"_source":["_id","list"],"from":%d,"size":%d`
|
|
|
+)
|
|
|
+
|
|
|
type ProjectPushJob struct {
|
|
|
}
|
|
|
|
|
|
func (p *ProjectPushJob) Execute() {
|
|
|
logger.Info("开始关联项目推送任务。。。")
|
|
|
unix := time.Now().Unix()
|
|
|
- projects := p.loadProject()
|
|
|
- startId := ""
|
|
|
- batchIndex := 0
|
|
|
- //
|
|
|
- pushPool := make(chan bool, Config.ProjectPushPoolSize)
|
|
|
- pushWait := &sync.WaitGroup{}
|
|
|
- for {
|
|
|
- batchIndex++
|
|
|
- batchCount, datas := p.loadPushspace_project(batchIndex, &startId)
|
|
|
- logger.Info("开始第", batchIndex, "次关联项目匹配。。。")
|
|
|
- index := 0
|
|
|
- for k, v := range *datas {
|
|
|
- pushPool <- true
|
|
|
- pushWait.Add(1)
|
|
|
- go func(userId string, list []map[string]interface{}) {
|
|
|
- defer util.Catch()
|
|
|
- defer func() {
|
|
|
- <-pushPool
|
|
|
- pushWait.Done()
|
|
|
- }()
|
|
|
- sess := putil.Mgo.GetMgoConn()
|
|
|
- defer putil.Mgo.DestoryMongoConn(sess)
|
|
|
- var u map[string]interface{}
|
|
|
- sess.DB(Config.Mongodb.DbName).C(User).FindId(bson.ObjectIdHex(userId)).Select(map[string]interface{}{
|
|
|
- "o_vipjy.i_projectmatch": 1,
|
|
|
- "i_vip_status": 1,
|
|
|
- }).One(&u)
|
|
|
- if u == nil || len(u) == 0 {
|
|
|
- logger.Error(userId, "没有找到该用户信息")
|
|
|
- p.Delete(userId)
|
|
|
- return
|
|
|
- } else if util.IntAll(u["i_vip_status"]) <= 0 {
|
|
|
- logger.Error(userId, "已经不再是vip")
|
|
|
- p.Delete(userId)
|
|
|
- return
|
|
|
- }
|
|
|
- o_vipjy, _ := u["o_vipjy"].(map[string]interface{})
|
|
|
- if util.IntAll(o_vipjy["i_projectmatch"]) == 0 {
|
|
|
- logger.Info(userId, "关联项目推送没有打开")
|
|
|
- p.Delete(userId)
|
|
|
- return
|
|
|
- }
|
|
|
- dayCountKey := DayCountKey(userId)
|
|
|
- dayCount, err := redis.GetNewInt(Pushcache_2_a, dayCountKey)
|
|
|
- if err != nil {
|
|
|
- logger.Error(userId, "redis中获取当天推送总数出错", err)
|
|
|
- return
|
|
|
- } else if dayCount >= Config.VipMaxPushSize {
|
|
|
- return
|
|
|
- }
|
|
|
- needCount := Config.VipMaxPushSize - dayCount
|
|
|
- sortList := &SortList{}
|
|
|
- length := 0
|
|
|
- i := len(list)
|
|
|
- for {
|
|
|
- var updateProject [][]map[string]interface{}
|
|
|
- for i > 0 {
|
|
|
- i--
|
|
|
- projectId, _ := list[i]["projectid"].(string)
|
|
|
- pi, ok := projects.Load(projectId)
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
- subtypes, _ := list[i]["subtypes"].([]interface{})
|
|
|
- maxId, _ := list[i]["maxid"].(string)
|
|
|
- userSubtype := map[string]bool{}
|
|
|
- for _, t := range subtypes {
|
|
|
- userSubtype[util.ObjToString(t)] = true
|
|
|
- }
|
|
|
- //
|
|
|
- project, _ := pi.(map[string]interface{})
|
|
|
- project_list, _ := project["list"].([]interface{})
|
|
|
- infoId := ""
|
|
|
- for _, pl := range project_list {
|
|
|
- plm, _ := pl.(map[string]interface{})
|
|
|
- infoId, _ = plm["infoid"].(string)
|
|
|
- if bson.ObjectIdHex(infoId) <= bson.ObjectIdHex(maxId) {
|
|
|
- continue
|
|
|
- }
|
|
|
- publishtime := util.Int64All(plm["publishtime"])
|
|
|
- if unix-publishtime > Config.ProjectPublishTimeLimit { //发布时间7天之内
|
|
|
- continue
|
|
|
- }
|
|
|
- subtype, _ := plm["subtype"].(string)
|
|
|
- if len(userSubtype) > 0 && !userSubtype[subtype] {
|
|
|
- continue
|
|
|
- }
|
|
|
- isExists, err := redis.Exists(Pushcache_2_a, PushInfoKey(userId, infoId))
|
|
|
- if err != nil {
|
|
|
- logger.Error("推送信息判重出错", err)
|
|
|
- continue
|
|
|
+ projectLength, projects := p.loadProject(unix)
|
|
|
+ if projectLength > 0 {
|
|
|
+ startId := ""
|
|
|
+ batchIndex := 0
|
|
|
+ //
|
|
|
+ pushPool := make(chan bool, Config.ProjectPushPoolSize)
|
|
|
+ pushWait := &sync.WaitGroup{}
|
|
|
+ for {
|
|
|
+ batchIndex++
|
|
|
+ batchCount, datas := p.loadPushspace_project(batchIndex, &startId)
|
|
|
+ logger.Info("开始第", batchIndex, "次关联项目匹配。。。")
|
|
|
+ index := 0
|
|
|
+ for k, v := range *datas {
|
|
|
+ pushPool <- true
|
|
|
+ pushWait.Add(1)
|
|
|
+ go func(userId string, list []map[string]interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ defer func() {
|
|
|
+ <-pushPool
|
|
|
+ pushWait.Done()
|
|
|
+ }()
|
|
|
+ sess := putil.Mgo.GetMgoConn()
|
|
|
+ defer putil.Mgo.DestoryMongoConn(sess)
|
|
|
+ var u map[string]interface{}
|
|
|
+ sess.DB(Config.Mongodb.DbName).C(User).FindId(bson.ObjectIdHex(userId)).Select(map[string]interface{}{
|
|
|
+ "o_vipjy.i_projectmatch": 1,
|
|
|
+ "i_vip_status": 1,
|
|
|
+ }).One(&u)
|
|
|
+ if u == nil || len(u) == 0 {
|
|
|
+ logger.Error(userId, "没有找到该用户信息")
|
|
|
+ p.Delete(userId)
|
|
|
+ return
|
|
|
+ } else if util.IntAll(u["i_vip_status"]) <= 0 {
|
|
|
+ logger.Error(userId, "已经不再是vip")
|
|
|
+ p.Delete(userId)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ o_vipjy, _ := u["o_vipjy"].(map[string]interface{})
|
|
|
+ if util.IntAll(o_vipjy["i_projectmatch"]) == 0 {
|
|
|
+ logger.Info(userId, "关联项目推送没有打开")
|
|
|
+ p.Delete(userId)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ dayCountKey := DayCountKey(userId)
|
|
|
+ dayCount, err := redis.GetNewInt(Pushcache_2_a, dayCountKey)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error(userId, "redis中获取当天推送总数出错", err)
|
|
|
+ return
|
|
|
+ } else if dayCount >= Config.VipMaxPushSize {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ needCount := Config.VipMaxPushSize - dayCount
|
|
|
+ sortList := &SortList{}
|
|
|
+ length := 0
|
|
|
+ i := len(list)
|
|
|
+ for {
|
|
|
+ var updateProject [][]map[string]interface{}
|
|
|
+ for i > 0 {
|
|
|
+ i--
|
|
|
+ projectId, _ := list[i]["projectid"].(string)
|
|
|
+ pi, ok := projects.Load(projectId)
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
}
|
|
|
- if isExists {
|
|
|
- continue
|
|
|
+ subtypes, _ := list[i]["subtypes"].([]interface{})
|
|
|
+ maxId, _ := list[i]["maxid"].(string)
|
|
|
+ userSubtype := map[string]bool{}
|
|
|
+ for _, t := range subtypes {
|
|
|
+ userSubtype[util.ObjToString(t)] = true
|
|
|
}
|
|
|
- length++
|
|
|
- //满足条件,可以推送
|
|
|
- info := map[string]interface{}{}
|
|
|
- for _, field := range InfoSaveFields {
|
|
|
- if field == "_id" || plm[field] == nil {
|
|
|
+ //
|
|
|
+ project, _ := pi.(map[string]interface{})
|
|
|
+ project_list, _ := project["list"].([]interface{})
|
|
|
+ infoId := ""
|
|
|
+ for _, pl := range project_list {
|
|
|
+ plm, _ := pl.(map[string]interface{})
|
|
|
+ infoId, _ = plm["infoid"].(string)
|
|
|
+ if bson.ObjectIdHex(infoId) <= bson.ObjectIdHex(maxId) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ publishtime := util.Int64All(plm["publishtime"])
|
|
|
+ if unix-publishtime > Config.ProjectPublishTimeLimit { //发布时间7天之内
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ subtype, _ := plm["subtype"].(string)
|
|
|
+ if len(userSubtype) > 0 && !userSubtype[subtype] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ isExists, err := redis.Exists(Pushcache_2_a, PushInfoKey(userId, infoId))
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送信息判重出错", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if isExists {
|
|
|
continue
|
|
|
}
|
|
|
- info[field] = plm[field]
|
|
|
+ length++
|
|
|
+ //满足条件,可以推送
|
|
|
+ info := map[string]interface{}{}
|
|
|
+ for _, field := range InfoSaveFields {
|
|
|
+ if field == "_id" || plm[field] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ info[field] = plm[field]
|
|
|
+ }
|
|
|
+ info["_id"] = infoId
|
|
|
+ *sortList = append(*sortList, &MatchInfo{Info: &info})
|
|
|
}
|
|
|
- info["_id"] = infoId
|
|
|
- *sortList = append(*sortList, &MatchInfo{Info: &info})
|
|
|
- }
|
|
|
- if infoId != maxId {
|
|
|
- updateProject = append(updateProject, []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": list[i]["_id"],
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "maxid": infoId,
|
|
|
+ if infoId != maxId {
|
|
|
+ updateProject = append(updateProject, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": list[i]["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "maxid": infoId,
|
|
|
+ },
|
|
|
},
|
|
|
- },
|
|
|
- })
|
|
|
- if len(updateProject) == BigBulkSize {
|
|
|
- break
|
|
|
+ })
|
|
|
+ if len(updateProject) == BigBulkSize {
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ if len(updateProject) > 0 {
|
|
|
+ putil.Mgo.NewUpdateBulk(Pushspace_project, true, true, updateProject...)
|
|
|
+ }
|
|
|
+ if len(updateProject) < BigBulkSize {
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
- if len(updateProject) > 0 {
|
|
|
- putil.Mgo.NewUpdateBulk(Pushspace_project, true, true, updateProject...)
|
|
|
+ if length == 0 {
|
|
|
+ return
|
|
|
}
|
|
|
- if len(updateProject) < BigBulkSize {
|
|
|
- break
|
|
|
+ sort.Sort(*sortList)
|
|
|
+ if length > needCount {
|
|
|
+ length = needCount
|
|
|
}
|
|
|
- }
|
|
|
- if length == 0 {
|
|
|
- return
|
|
|
- }
|
|
|
- sort.Sort(*sortList)
|
|
|
- if length > needCount {
|
|
|
- length = needCount
|
|
|
- }
|
|
|
- saveArray := make([]*MatchInfo, length)
|
|
|
- for sk, sl := range *sortList {
|
|
|
- saveArray[length-1-sk] = sl
|
|
|
- if sk == length-1 {
|
|
|
- break
|
|
|
+ saveArray := make([]*MatchInfo, length)
|
|
|
+ for sk, sl := range *sortList {
|
|
|
+ saveArray[length-1-sk] = sl
|
|
|
+ if sk == length-1 {
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- if pushDate := putil.SaveToPushsubscribe(true, true, userId, &saveArray); pushDate > 0 {
|
|
|
- logger.Info(userId, "关联项目推送保存成功", pushDate, length)
|
|
|
- redis.Put(Pushcache_2_a, dayCountKey, dayCount+length, OneDaySecond)
|
|
|
- for _, v := range saveArray {
|
|
|
- redis.Put(Pushcache_2_a, PushInfoKey(userId, util.ObjToString((*v.Info)["_id"])), 1, OneDaySecond)
|
|
|
+ if pushDate := putil.SaveToPushsubscribe(true, true, userId, &saveArray); pushDate > 0 {
|
|
|
+ logger.Info(userId, "关联项目推送保存成功", pushDate, length)
|
|
|
+ redis.Put(Pushcache_2_a, dayCountKey, dayCount+length, OneDaySecond)
|
|
|
+ for _, v := range saveArray {
|
|
|
+ redis.Put(Pushcache_2_a, PushInfoKey(userId, util.ObjToString((*v.Info)["_id"])), 1, OneDaySecond)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logger.Info(userId, "关联项目推送保存出错", pushDate)
|
|
|
}
|
|
|
- } else {
|
|
|
- logger.Info(userId, "关联项目推送保存出错", pushDate)
|
|
|
+ }(k, v)
|
|
|
+ index++
|
|
|
+ if index%500 == 0 {
|
|
|
+ logger.Info("第", batchIndex, "次关联项目匹配:", index)
|
|
|
}
|
|
|
- }(k, v)
|
|
|
- index++
|
|
|
- if index%500 == 0 {
|
|
|
- logger.Info("第", batchIndex, "次关联项目匹配:", index)
|
|
|
}
|
|
|
- }
|
|
|
- pushWait.Wait()
|
|
|
- logger.Info("第", batchIndex, "次关联项目匹配结束", index)
|
|
|
- if batchCount < Config.PushBatch {
|
|
|
- break
|
|
|
+ pushWait.Wait()
|
|
|
+ logger.Info("第", batchIndex, "次关联项目匹配结束", index)
|
|
|
+ if batchCount < Config.PushBatch {
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
ProjectTask.Pici = unix
|
|
@@ -244,47 +254,62 @@ func (p *ProjectPushJob) loadPushspace_project(batchIndex int, startId *string)
|
|
|
}
|
|
|
|
|
|
//加载项目数据
|
|
|
-func (p *ProjectPushJob) loadProject() *sync.Map {
|
|
|
- query := map[string]interface{}{
|
|
|
- "pici": map[string]interface{}{
|
|
|
- "$gte": ProjectTask.Pici,
|
|
|
- },
|
|
|
- }
|
|
|
- //query = map[string]interface{}{
|
|
|
- //"_id": bson.ObjectIdHex("5da828a6e138234108b4c02e"),
|
|
|
- //}
|
|
|
- logger.Info("开始加载项目。。。", query)
|
|
|
- sess := putil.Mgo_Project.GetMgoConn()
|
|
|
- defer putil.Mgo_Project.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(Config.ProjectMongodb.DbName).C(Config.ProjectMongodb.CollName).Find(query).Select(map[string]interface{}{
|
|
|
- "_id": 1,
|
|
|
- "list": 1,
|
|
|
- }).Iter()
|
|
|
+func (p *ProjectPushJob) loadProject(unix int64) (int64, *sync.Map) {
|
|
|
+ defer util.Catch()
|
|
|
projectMap := &sync.Map{}
|
|
|
- index := 0
|
|
|
+ query := fmt.Sprintf(projectQuery, ProjectTask.Pici, unix, "")
|
|
|
+ logger.Info("开始加载项目。。。", query)
|
|
|
+ count := int(elastic.Count(Projectset, Projectset, query))
|
|
|
+ logger.Info("需要加载项目", count, "条")
|
|
|
+ if count == 0 {
|
|
|
+ return 0, projectMap
|
|
|
+ }
|
|
|
+ onceSize := Config.ProjectPagingSize //ES一次查询这么多条
|
|
|
+ if onceSize > count {
|
|
|
+ onceSize = count
|
|
|
+ }
|
|
|
+ totalPage := int((count + onceSize - 1) / onceSize)
|
|
|
+ logger.Info("加载项目一共", totalPage, "页")
|
|
|
loadPool := make(chan bool, Config.LoadProjectPoolSize)
|
|
|
loadWait := &sync.WaitGroup{}
|
|
|
- for m := make(map[string]interface{}); it.Next(&m); {
|
|
|
- loadPool <- true
|
|
|
- loadWait.Add(1)
|
|
|
- go func(data map[string]interface{}) {
|
|
|
- defer util.Catch()
|
|
|
- defer func() {
|
|
|
- <-loadPool
|
|
|
- loadWait.Done()
|
|
|
- }()
|
|
|
- _id := BsonIdToSId(data["_id"])
|
|
|
- projectMap.Store(_id, data)
|
|
|
- }(m)
|
|
|
- m = make(map[string]interface{})
|
|
|
- index++
|
|
|
- if index%500 == 0 {
|
|
|
- logger.Info("加载项目:", index)
|
|
|
+ var length int64
|
|
|
+ for t := 0; t < 3; t++ {
|
|
|
+ projectMap = &sync.Map{}
|
|
|
+ for i := 0; i < totalPage; i++ {
|
|
|
+ loadPool <- true
|
|
|
+ loadWait.Add(1)
|
|
|
+ go func(start int) {
|
|
|
+ defer util.Catch()
|
|
|
+ defer func() {
|
|
|
+ <-loadPool
|
|
|
+ loadWait.Done()
|
|
|
+ }()
|
|
|
+ size := onceSize
|
|
|
+ if start == totalPage-1 && count%onceSize != 0 {
|
|
|
+ size = count % onceSize
|
|
|
+ }
|
|
|
+ pagingQuery := fmt.Sprintf(projectQuery, ProjectTask.Pici, unix, fmt.Sprintf(projectPagingQuery, start*onceSize, size))
|
|
|
+ logger.Info("加载项目", "开始加载第", start+1, "页数据", pagingQuery)
|
|
|
+ r := elastic.Get(Projectset, Projectset, pagingQuery)
|
|
|
+ if r == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, v := range *r {
|
|
|
+ length = atomic.AddInt64(&length, 1)
|
|
|
+ _id, _ := v["_id"].(string)
|
|
|
+ projectMap.Store(_id, v)
|
|
|
+ }
|
|
|
+ logger.Info("加载项目", "第", start+1, "页数据加载完成")
|
|
|
+ }(i)
|
|
|
+ }
|
|
|
+ loadWait.Wait()
|
|
|
+ if int(length) >= count-5 {
|
|
|
+ break
|
|
|
}
|
|
|
+ logger.Info("加载项目", "第", t, "次加载数据完成,数据总数", length, ",由于数据量不够,重新加载")
|
|
|
}
|
|
|
- loadWait.Wait()
|
|
|
- logger.Info("加载项目结束。。。", index)
|
|
|
- return projectMap
|
|
|
+ logger.Info("加载项目结束。。。", length)
|
|
|
+ return length, projectMap
|
|
|
}
|
|
|
|
|
|
//清理三个月前的数据
|