package projecter import ( "fmt" "sort" "sync" "sync/atomic" "time" util "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/date" elastic "app.yhyue.com/moapp/jybase/esv1" . "app.yhyue.com/moapp/jybase/mongodb" "app.yhyue.com/moapp/jybase/redis" . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p" "app.yhyue.com/moapp/jybase/logger" ) type Pusher interface { Filter(unique string) (bool, []interface{}, *UserInfo) SaveToMysql(u *UserInfo, matchInfos *SortList, jobTime *time.Time) int64 PushInfoKey(u *UserInfo, infoId string) string DayCountKey(u *UserInfo, ymd string) string ChildUserInfo(unique interface{}, parent *UserInfo) (*UserInfo, bool) } const ( projectQuery = `{"query":{"filtered":{"filter":{"bool":{"must":[{"range":{"pici":{"gt":%d,"lte":%d}}}]}}}}%s}` projectPagingQuery = `,"_source":["_id","list"],"from":%d,"size":%d` ) type Push struct { Mgo_Bidding *MongodbSim Mgo_Log *MongodbSim ProjectPushPoolSize int ProjectPublishTimeLimit int64 Mgo_Log_DbName string Mgo_Bidding_DbName string Mgo_Bidding_Collection string MaxPushSize int MaxLoadProjectCount int ProjectPagingSize int Pici int64 LoadProjectPoolSize int ProjectRetainDay int ProjectBatch int TestQuery map[string]interface{} MaxRelationProject int Pushspace_project string Unique string SelectField map[string]interface{} ProjectCount map[string]int } func (p *Push) Execute(pusher Pusher) int64 { defer util.Catch() now := time.Now() todayEnd := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, time.Local) unix := now.Unix() projectLength, projects := p.loadProject(unix) if projectLength > 0 { startId := "" batchIndex := 0 // pushPool := make(chan bool, p.ProjectPushPoolSize) pushWait := &sync.WaitGroup{} ymd := NowFormat(Date_yyyyMMdd) for { p.ProjectCount = map[string]int{} batchIndex++ batchCount, datas := p.loadPushspace_project(batchIndex, &startId) logger.Info("开始第", batchIndex, "次关联项目匹配。。。") index := 0 for k, v := range *datas { pushPool <- true pushWait.Add(1) go func(ue string, list []map[string]interface{}) { defer util.Catch() defer func() { <-pushPool pushWait.Done() }() isFilter, uniques, parent := pusher.Filter(ue) if isFilter { p.Delete(ue) return } hasUpdate := false for _, unique := range uniques { sortList := &SortList{} length := 0 i := len(list) oneHasUpdate := false userInfo, isMain := pusher.ChildUserInfo(unique, parent) for { var updateProject [][]map[string]interface{} for i > 0 { i-- myInfoId := util.ObjToString(list[i]["infoid"]) pi, ok := projects.Load(myInfoId) if !ok { continue } subtypes, _ := list[i]["subtypes"].([]interface{}) maxPublishTime := util.Int64All(list[i]["maxpublishtime"]) userSubtype := map[string]bool{} for _, t := range subtypes { userSubtype[util.ObjToString(t)] = true } // project, _ := pi.(map[string]interface{}) project_list, _ := project["list"].([]interface{}) var publishTime int64 for _, pl := range project_list { plm, _ := pl.(map[string]interface{}) publishTime = util.Int64All(plm["publishtime"]) infoId, _ := plm["infoid"].(string) if publishTime <= maxPublishTime { continue } else if unix-publishTime > p.ProjectPublishTimeLimit { //发布时间7天之内 continue } else if subtype, _ := plm["subtype"].(string); len(userSubtype) > 0 && !userSubtype[subtype] { continue } else if infoId == myInfoId { continue } isExists, err := redis.Exists(Pushcache_2_a, pusher.PushInfoKey(userInfo, infoId)) if err != nil { logger.Error("推送信息判重出错", err) continue } else if isExists { continue } length++ //满足条件,可以推送 info := map[string]interface{}{} for _, field := range SaveBiddingField { if field == "_id" || plm[field] == nil { continue } info[field] = plm[field] } info["_id"] = infoId info["attachment_count"] = GetAttachmentCountById(p.Mgo_Bidding, p.Mgo_Bidding_DbName, p.Mgo_Bidding_Collection, infoId) *sortList = append(*sortList, &MatchInfo{Info: &info}) } if !hasUpdate && isMain && publishTime > maxPublishTime { updateProject = append(updateProject, []map[string]interface{}{ map[string]interface{}{ "_id": list[i]["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "maxpublishtime": publishTime, }, }, }) if len(updateProject) == Mgo_BulkSize { break } } } if len(updateProject) > 0 { if p.Mgo_Log.UpSertMultiBulk(p.Pushspace_project, true, true, updateProject...) { oneHasUpdate = true } } if len(updateProject) < Mgo_BulkSize { break } } if oneHasUpdate { hasUpdate = true } if length == 0 { continue } sort.Sort(*sortList) dayCountKey := pusher.DayCountKey(userInfo, ymd) dayCount, err := redis.GetNewInt(Pushcache_2_a, dayCountKey) maxPushSize := userInfo.SubSet.MaxPushSize if maxPushSize <= 0 { maxPushSize = p.MaxPushSize } if err != nil { logger.Error(unique, "redis中获取当天推送总数出错", err) continue } else if dayCount >= maxPushSize { continue } needCount := maxPushSize - dayCount if length > needCount { length = needCount } saveArray := make(SortList, length) for sk, sl := range *sortList { saveArray[length-1-sk] = sl if sk == length-1 { break } } jobTime := time.Now() if jobTime.After(todayEnd) { jobTime = todayEnd } if pushDate := pusher.SaveToMysql(userInfo, &saveArray, &jobTime); pushDate > 0 { redis.Put(Pushcache_2_a, dayCountKey, dayCount+length, OneDaySecond) for _, v := range saveArray { redis.Put(Pushcache_2_a, pusher.PushInfoKey(userInfo, util.ObjToString((*v.Info)["_id"])), 1, OneDaySecond) } } } }(k, v) index++ if index%500 == 0 { logger.Info("第", batchIndex, "次关联项目匹配:", index) } } pushWait.Wait() logger.Info("第", batchIndex, "次关联项目匹配结束", index) if batchCount < p.ProjectBatch { p.ProjectCount = map[string]int{} break } } } p.Clear() return unix } //分批次加载pushspace_project表数据 func (p *Push) loadPushspace_project(batchIndex int, startId *string) (int, *map[string][]map[string]interface{}) { query := map[string]interface{}{} if *startId != "" { query["_id"] = map[string]interface{}{ "$lt": StringTOBsonId(*startId), } } if p.TestQuery != nil { for k, v := range p.TestQuery { query[k] = v } } // query = map[string]interface{}{ // "_id": StringTOBsonId("628602aff1583c11b8cab58b"), // } logger.Info("开始加载第", batchIndex, "批关联项目", query) index := 0 //一个用户最多加载最新的一万个关联项目 datas := map[string][]map[string]interface{}{} sess := p.Mgo_Log.GetMgoConn() defer p.Mgo_Log.DestoryMongoConn(sess) it := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).Find(query).Select(p.SelectField).Sort("-_id").Iter() for temp := make(map[string]interface{}); it.Next(&temp); { unique, _ := temp[p.Unique].(string) *startId = BsonIdToSId(temp["_id"]) if p.ProjectCount[unique] < p.MaxRelationProject { datas[unique] = append(datas[unique], temp) } p.ProjectCount[unique] = p.ProjectCount[unique] + 1 index++ if index == p.ProjectBatch { break } temp = make(map[string]interface{}) } logger.Info("第", batchIndex, "批关联项目加载结束", index, *startId) return index, &datas } //加载项目数据 func (p *Push) loadProject(unix int64) (int64, *sync.Map) { defer util.Catch() projectMap := &sync.Map{} query := fmt.Sprintf(projectQuery, p.Pici, unix, "") //query = `{"query":{"filtered":{"filter":{"bool":{"must":[{"term": {"id": "5cef4c36a5cb26b9b7498a4c"}}]}}}}}` logger.Info("开始加载项目。。。", query) count := int(elastic.Count(Es_Projectset, Es_Projectset, query)) logger.Info("需要加载项目", count, "条") if count == 0 { return 0, projectMap } else if count > p.MaxLoadProjectCount { count = p.MaxLoadProjectCount logger.Info("需要加载的项目数超过限制,只加载", p.MaxLoadProjectCount, "条") } onceSize := p.ProjectPagingSize //ES一次查询这么多条 if onceSize > count { onceSize = count } totalPage := int((count + onceSize - 1) / onceSize) logger.Info("加载项目一共", totalPage, "页") loadPool := make(chan bool, p.LoadProjectPoolSize) loadWait := &sync.WaitGroup{} var length int64 for t := 0; t < 3; t++ { projectMap = &sync.Map{} for i := 0; i < totalPage; i++ { loadPool <- true loadWait.Add(1) go func(start int) { defer util.Catch() defer func() { <-loadPool loadWait.Done() }() size := onceSize if start == totalPage-1 && count%onceSize != 0 { size = count % onceSize } pagingQuery := fmt.Sprintf(projectQuery, p.Pici, unix, fmt.Sprintf(projectPagingQuery, start*onceSize, size)) //pagingQuery = `{"query":{"filtered":{"filter":{"bool":{"must":[{"term": {"id": "5cef4c36a5cb26b9b7498a4c"}}]}}}}` + fmt.Sprintf(projectPagingQuery, start*onceSize, size) + `}` logger.Info("加载项目", "开始加载第", start+1, "页数据", pagingQuery) r := elastic.Get(Es_Projectset, Es_Projectset, pagingQuery) if r == nil { return } for _, v := range *r { length = atomic.AddInt64(&length, 1) list, _ := v["list"].([]interface{}) for _, vv := range list { vv_map, _ := vv.(map[string]interface{}) infoid, _ := vv_map["infoid"].(string) projectMap.Store(infoid, v) } } logger.Info("加载项目", "第", start+1, "页数据加载完成") }(i) } loadWait.Wait() if int(length) >= count-5 { break } logger.Info("加载项目", "第", t, "次加载数据完成,数据总数", length, ",由于数据量不够,重新加载") } logger.Info("加载项目结束。。。", length) return length, projectMap } //清理三个月前的数据 func (p *Push) Clear() { logger.Info("开始清理过期的关联项目数据。。。") sess := p.Mgo_Log.GetMgoConn() defer p.Mgo_Log.DestoryMongoConn(sess) _, err := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{ "createtime": map[string]interface{}{ "$lt": time.Now().AddDate(0, 0, -p.ProjectRetainDay).Unix(), }, }) if err != nil { logger.Error("清理过期的关联项目数据错误", err) } else { logger.Info("清理过期的关联项目数据结束。。。") } } //删除 func (p *Push) Delete(unique string) { sess := p.Mgo_Log.GetMgoConn() defer p.Mgo_Log.DestoryMongoConn(sess) sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{p.Unique: unique}) }