123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- package projecter
- import (
- "fmt"
- "sort"
- "sync"
- "time"
- util "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/date"
- elastic "app.yhyue.com/moapp/jybase/es"
- "app.yhyue.com/moapp/jybase/logger"
- . "app.yhyue.com/moapp/jybase/mongodb"
- "app.yhyue.com/moapp/jybase/redis"
- . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
- )
- type Pusher interface {
- Filter(unique string) (bool, []interface{}, *UserInfo)
- SaveToMysql(u *UserInfo, matchInfos *SortList, jobTime *time.Time) int64
- PushInfoKey(u *UserInfo, infoId string) string
- DayCountKey(u *UserInfo, ymd string) string
- ChildUserInfo(unique interface{}, parent *UserInfo) (*UserInfo, bool)
- }
- const (
- projectQuery = `{"query":{"bool":{"must":{"range":{"pici":{"gte":%d,"lt":%d}}}}}%s}`
- projectPagingQuery = `,"_source":["_id","list"]`
- )
- type Push struct {
- Mgo_Bidding *MongodbSim
- Mgo_Log *MongodbSim
- ProjectPushPoolSize int
- ProjectPublishTimeLimit int64
- Mgo_Log_DbName string
- Mgo_Bidding_DbName string
- Mgo_Bidding_Collection string
- MaxPushSize int
- MaxLoadProjectCount int
- Pici int64
- ProjectRetainDay int
- ProjectBatch int
- TestQuery map[string]interface{}
- MaxRelationProject int
- Pushspace_project string
- Unique string
- SelectField map[string]interface{}
- ProjectCount map[string]int
- }
- func (p *Push) Execute(pusher Pusher) int64 {
- defer util.Catch()
- now := time.Now()
- todayEnd := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, time.Local)
- unix := now.Unix()
- projectLength, projects := p.loadProject(unix)
- if projectLength > 0 {
- startId := ""
- batchIndex := 0
- //
- pushPool := make(chan bool, p.ProjectPushPoolSize)
- pushWait := &sync.WaitGroup{}
- ymd := NowFormat(Date_yyyyMMdd)
- for {
- p.ProjectCount = map[string]int{}
- batchIndex++
- batchCount, datas := p.loadPushspace_project(batchIndex, &startId)
- logger.Info("开始第", batchIndex, "次关联项目匹配。。。")
- index := 0
- for k, v := range *datas {
- pushPool <- true
- pushWait.Add(1)
- go func(ue string, list []map[string]interface{}) {
- defer util.Catch()
- defer func() {
- <-pushPool
- pushWait.Done()
- }()
- isFilter, uniques, parent := pusher.Filter(ue)
- if isFilter {
- p.Delete(ue)
- return
- }
- hasUpdate := false
- for _, unique := range uniques {
- sortList := &SortList{}
- length := 0
- i := len(list)
- oneHasUpdate := false
- userInfo, isMain := pusher.ChildUserInfo(unique, parent)
- for {
- var updateProject [][]map[string]interface{}
- for i > 0 {
- i--
- myInfoId := util.ObjToString(list[i]["infoid"])
- pi, ok := projects.Load(myInfoId)
- if !ok {
- continue
- }
- subtypes, _ := list[i]["subtypes"].([]interface{})
- maxPublishTime := util.Int64All(list[i]["maxpublishtime"])
- userSubtype := map[string]bool{}
- for _, t := range subtypes {
- userSubtype[util.ObjToString(t)] = true
- }
- //
- project, _ := pi.(map[string]interface{})
- project_list, _ := project["list"].([]interface{})
- var publishTime int64
- for _, pl := range project_list {
- plm, _ := pl.(map[string]interface{})
- publishTime = util.Int64All(plm["publishtime"])
- infoId, _ := plm["infoid"].(string)
- if publishTime <= maxPublishTime {
- continue
- } else if unix-publishTime > p.ProjectPublishTimeLimit { //发布时间7天之内
- continue
- } else if subtype, _ := plm["subtype"].(string); len(userSubtype) > 0 && !userSubtype[subtype] {
- continue
- } else if infoId == myInfoId {
- continue
- }
- isExists, err := redis.Exists(Pushcache_2_a, pusher.PushInfoKey(userInfo, infoId))
- if err != nil {
- logger.Error("推送信息判重出错", err)
- continue
- } else if isExists {
- continue
- }
- length++
- //满足条件,可以推送
- info := map[string]interface{}{}
- for _, field := range SaveBiddingField {
- if field == "_id" || plm[field] == nil {
- continue
- }
- info[field] = plm[field]
- }
- info["_id"] = infoId
- info["attachment_count"] = GetAttachmentCountById(p.Mgo_Bidding, p.Mgo_Bidding_DbName, p.Mgo_Bidding_Collection, infoId)
- *sortList = append(*sortList, &MatchInfo{Info: info})
- }
- if !hasUpdate && isMain && publishTime > maxPublishTime {
- updateProject = append(updateProject, []map[string]interface{}{
- map[string]interface{}{
- "_id": list[i]["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "maxpublishtime": publishTime,
- },
- },
- })
- if len(updateProject) == Mgo_BulkSize {
- break
- }
- }
- }
- if len(updateProject) > 0 {
- if p.Mgo_Log.UpSertMultiBulk(p.Pushspace_project, true, true, updateProject...) {
- oneHasUpdate = true
- }
- }
- if len(updateProject) < Mgo_BulkSize {
- break
- }
- }
- if oneHasUpdate {
- hasUpdate = true
- }
- if length == 0 {
- continue
- }
- sort.Sort(*sortList)
- dayCountKey := pusher.DayCountKey(userInfo, ymd)
- dayCount, err := redis.GetNewInt(Pushcache_2_a, dayCountKey)
- maxPushSize := userInfo.SubSet.MaxPushSize
- if maxPushSize <= 0 {
- maxPushSize = p.MaxPushSize
- }
- if err != nil {
- logger.Error(unique, "redis中获取当天推送总数出错", err)
- continue
- } else if dayCount >= maxPushSize {
- continue
- }
- needCount := maxPushSize - dayCount
- if length > needCount {
- length = needCount
- }
- saveArray := make(SortList, length)
- for sk, sl := range *sortList {
- saveArray[length-1-sk] = sl
- if sk == length-1 {
- break
- }
- }
- jobTime := time.Now()
- if jobTime.After(todayEnd) {
- jobTime = todayEnd
- }
- if pushDate := pusher.SaveToMysql(userInfo, &saveArray, &jobTime); pushDate > 0 {
- redis.Put(Pushcache_2_a, dayCountKey, dayCount+length, OneDaySecond)
- for _, v := range saveArray {
- redis.Put(Pushcache_2_a, pusher.PushInfoKey(userInfo, util.ObjToString(v.Info["_id"])), 1, OneDaySecond)
- }
- }
- }
- }(k, v)
- index++
- if index%500 == 0 {
- logger.Info("第", batchIndex, "次关联项目匹配:", index)
- }
- }
- pushWait.Wait()
- logger.Info("第", batchIndex, "次关联项目匹配结束", index)
- if batchCount < p.ProjectBatch {
- p.ProjectCount = map[string]int{}
- break
- }
- }
- }
- p.Clear()
- return unix
- }
- //分批次加载pushspace_project表数据
- func (p *Push) loadPushspace_project(batchIndex int, startId *string) (int, *map[string][]map[string]interface{}) {
- query := map[string]interface{}{}
- if *startId != "" {
- query["_id"] = map[string]interface{}{
- "$lt": StringTOBsonId(*startId),
- }
- }
- if p.TestQuery != nil {
- for k, v := range p.TestQuery {
- query[k] = v
- }
- }
- // query = map[string]interface{}{
- // "_id": StringTOBsonId("628602aff1583c11b8cab58b"),
- // }
- logger.Info("开始加载第", batchIndex, "批关联项目", query)
- index := 0
- //一个用户最多加载最新的一万个关联项目
- datas := map[string][]map[string]interface{}{}
- sess := p.Mgo_Log.GetMgoConn()
- defer p.Mgo_Log.DestoryMongoConn(sess)
- it := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).Find(query).Select(p.SelectField).Sort("-_id").Iter()
- for temp := make(map[string]interface{}); it.Next(&temp); {
- unique, _ := temp[p.Unique].(string)
- *startId = BsonIdToSId(temp["_id"])
- if p.ProjectCount[unique] < p.MaxRelationProject {
- datas[unique] = append(datas[unique], temp)
- }
- p.ProjectCount[unique] = p.ProjectCount[unique] + 1
- index++
- if index == p.ProjectBatch {
- break
- }
- temp = make(map[string]interface{})
- }
- logger.Info("第", batchIndex, "批关联项目加载结束", index, *startId)
- return index, &datas
- }
- //加载项目数据
- func (p *Push) loadProject(unix int64) (int64, *sync.Map) {
- defer util.Catch()
- projectMap := &sync.Map{}
- query := fmt.Sprintf(projectQuery, p.Pici, unix, projectPagingQuery)
- //query = `{"query":{"filtered":{"filter":{"bool":{"must":[{"term": {"id": "5cef4c36a5cb26b9b7498a4c"}}]}}}}}`
- logger.Info("开始加载项目。。。", query)
- count := int(elastic.Count(Es_Projectset, Es_Projectset, fmt.Sprintf(projectQuery, p.Pici, unix, "")))
- logger.Info("需要加载项目", count, "条")
- if count == 0 {
- return 0, projectMap
- } else if count > p.MaxLoadProjectCount {
- count = p.MaxLoadProjectCount
- logger.Info("需要加载的项目数超过限制,只加载", p.MaxLoadProjectCount, "条")
- }
- var length int64
- for t := 1; t <= 5; t++ {
- length = 0
- projectMap = &sync.Map{}
- elastic.VarEs.(*elastic.EsV7).Scroll(Es_Projectset, "5m", query, func(fv map[string]interface{}) bool {
- if int(length) > count {
- return false
- }
- list, _ := fv["list"].([]interface{})
- for _, vv := range list {
- vv_map, _ := vv.(map[string]interface{})
- infoid, _ := vv_map["infoid"].(string)
- projectMap.Store(infoid, fv)
- }
- length++
- return true
- })
- if int(length) >= count {
- break
- }
- logger.Info("加载项目", "第", t, "次加载数据完成,数据总数", length, count, ",由于数据量不够,重新加载")
- }
- logger.Info("加载项目结束。。。", length)
- return length, projectMap
- }
- //清理三个月前的数据
- func (p *Push) Clear() {
- logger.Info("开始清理过期的关联项目数据。。。")
- sess := p.Mgo_Log.GetMgoConn()
- defer p.Mgo_Log.DestoryMongoConn(sess)
- _, err := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{
- "createtime": map[string]interface{}{
- "$lt": time.Now().AddDate(0, 0, -p.ProjectRetainDay).Unix(),
- },
- })
- if err != nil {
- logger.Error("清理过期的关联项目数据错误", err)
- } else {
- logger.Info("清理过期的关联项目数据结束。。。")
- }
- }
- //删除
- func (p *Push) Delete(unique string) {
- sess := p.Mgo_Log.GetMgoConn()
- defer p.Mgo_Log.DestoryMongoConn(sess)
- sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{p.Unique: unique})
- }
|