package model import ( "app.yhyue.com/moapp/jybase/redis" "encoding/json" "fmt" "jyBXBase/rpc/bxbase" IC "jyBXBase/rpc/init" "jyBXBase/rpc/internal/config" "log" "sort" "strings" "time" MC "app.yhyue.com/moapp/jybase/common" ME "app.yhyue.com/moapp/jybase/encrypt" elastic "app.yhyue.com/moapp/jybase/es" "app.yhyue.com/moapp/jybase/mongodb" "app.yhyue.com/moapp/jybase/mysql" "github.com/zeromicro/go-zero/core/logx" "go.mongodb.org/mongo-driver/bson/primitive" ) const ( search_index = "bidding" search_type = "bidding" mongodb_fields = `{"_id":1,"area":1,"publishtime":1,"s_subscopeclass":1,"subtype":1,"title":1,"toptype":1,"type":1, "buyerclass":1,"budget":1,"bidamount":1,"s_winner":1,"bidopentime":1,"buyer":1,"projectname":1,"spidercode":1,"site":1}` query = `{"query":{"terms":{"_id":["%s"]}},"_source":["_id","area", "publishtime", "s_subscopeclass", "subtype", "title", "toptype", "type", "buyerclass","bidamount","budget","projectname","buyer","bidopentime","s_winner","filetext","spidercode","site"],"from":0,"size":%d}` multi_match = `{"multi_match": {"query": %s,"type": "phrase", "fields": ["title"]}}` query_bool_must = `{"terms":{"%s":[%s]}}` query_bool_must_and = `{"bool":{"must":[%s],"must_not":[%s]}}` search_field = `"_id","area", "publishtime", "s_subscopeclass", "subtype", "title", "toptype", "type", "buyerclass","bidamount","budget","projectname","buyer","bidopentime","s_winner","filetext","isValidFile","spidercode","site"` query_city_hkeys = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match":%d}},"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"title": {"fragment_size": 0,"number_of_fragments": 1}}},"_source":[` + search_field + `],"sort":[{"publishtime":"desc"},{"budget":"desc"}],"from":0,"size":%d}` Pushbidding = "global_common_data.dws_f_bid_baseinfo" Province = "province" StatusCache = iota // 0 -不存缓存 本来查的就是缓存的数据 StatusNoLogin // 1-未登录用户 StatusLoginUser // 2-用户自己的key StatusLogin // 3-登录用户最新标讯 ) type NewestInfo struct { TableName string UserId string MysqlDb *mysql.Mysql NewUserId int64 PushMysql *mysql.Mysql NewsLimitNum int64 // 最新标讯数量条数限制 IsEnt bool } var mysqlTables = map[string]string{ "f": "push.pushsubscribe", "v": "push.pushsubscribe", "m": "push.pushmember", "e": "push.pushentniche", } func GetRoleNewestInfoService(AppId, MgoUserId string, NewUserId, AccountId, EntId, EntUserId, PositionType, PositionId int64) (roleNewestInfo *NewestInfo, flag string) { powerCheck := IC.Middleground.PowerCheckCenter.Check(AppId, MgoUserId, NewUserId, AccountId, EntId, PositionType, PositionId) thisUserId := MC.If(PositionType == 1, MC.InterfaceToStr(EntUserId), MC.InterfaceToStr(NewUserId)).(string) thisNewUserId := MC.If(PositionType == 1, EntUserId, NewUserId).(int64) if powerCheck.Member.Status > 0 { // 大会员 flag = "m" } else if powerCheck.Entniche.Status > 0 && powerCheck.Entniche.PowerSource != 1 && powerCheck.Entniche.IsEntPower == 1 { // 商机管理 flag = "e" } else if powerCheck.Vip.Status > 0 { // 超级订阅 flag = "v" } else { // 普通用户 flag = "f" } thisUserType := MC.If(PositionType == 1, "e", flag).(string) return GetNewestInfo(thisUserId, thisUserType, thisNewUserId), flag } func GetNewestInfo(userId, userType string, newUserId int64) *NewestInfo { nt := &NewestInfo{ UserId: userId, TableName: mysqlTables[userType], MysqlDb: IC.BaseServiceMysql, NewUserId: newUserId, NewsLimitNum: IC.C.NewsLimitNum, } return nt } // GetPushHistoryCount 缓存code 配置最新的redis code;可以感觉redis内存大小 调节redis存储; // 根据GetPushHistoryCount 返回值 作为列表缓存key中一个参数 func (n *NewestInfo) GetPushHistoryCount() int64 { countKey := fmt.Sprintf(IC.C.NewsCache.Count.Key, n.TableName, n.UserId) if b := redis.GetInt("new", countKey); b != 0 { return int64(b) } countSql := "SELECT COUNT(1) FROM (SELECT 1 FROM %s WHERE userid = %d LIMIT 1) AS ph" //countSql := "select count(1) from %s a where a.userid=%d order by a.id desc" countSql = fmt.Sprintf(countSql, n.TableName, n.NewUserId) timeOut := IC.C.NewsCache.Count.Timeout c := n.MysqlDb.CountBySql(countSql) if c > 0 { timeOut = timeOut * 7 } else { c = -1 } redis.Put("new", countKey, c, timeOut) //过期时间走配置,比最新标讯列表缓存时间 要短1/10 尽量不超过两分钟 return c } // 判断当前用户是否有最新推送信息 func (n *NewestInfo) IsHasNewPushData(time int64) bool { return n.MysqlDb.CountBySql(fmt.Sprintf("SELECT COUNT(1) FROM (SELECT 1 FROM %s WHERE userid = %d AND date > %d LIMIT 1) AS ph", n.TableName, n.NewUserId, time)) > 0 } // GetPushHistory func (n *NewestInfo) GetPushHistory() (res []*bxbase.NewestList) { // findSQL := "select a.infoid,REPLACE(a.matchkeys,'+',' ') as matchkeys,a.attachment_count,a.budget,a.bidamount from %s a where a.userid=%d order by a.id desc limit ?" findSQL = fmt.Sprintf(findSQL, n.TableName, n.NewUserId) logx.Info(n.TableName, "-------", n.NewUserId, ",findSQL:", findSQL) list := n.MysqlDb.SelectBySql(findSQL, n.NewsLimitNum) if len(*list) > 0 && list != nil { m := map[string]bool{} es_ids := []string{} infos := map[string]*bxbase.NewestList{} for _, v := range *list { infoId := MC.ObjToString(v["infoid"]) if m[infoId] { continue } es_ids = append(es_ids, infoId) m[MC.ObjToString(v["infoid"])] = true // infos[infoId] = &bxbase.NewestList{ Id: ME.EncodeArticleId2ByCheck(MC.ObjToString(v["infoid"])), Matchkeys: strings.Split(MC.ObjToString(v["matchkeys"]), " "), Budget: MC.Int64All(v["budget"]), Bidamount: MC.Int64All(v["bidamount"]), FileExists: MC.Int64All(v["attachment_count"]) > 0, } } if len(es_ids) > 0 { list := elastic.Get(search_index, search_type, fmt.Sprintf(query, strings.Join(es_ids, `","`), len(es_ids))) if list != nil { for _, v := range *list { _id := MC.ObjToString(v["_id"]) bn := infos[_id] bn.Title = MC.ObjToString(v["title"]) bn.PublishTime = MC.Int64All(v["publishtime"]) bn.Subtype = MC.If(v["subtype"] != nil, MC.ObjToString(v["subtype"]), MC.ObjToString(v["toptype"])).(string) bn.Area = MC.If(MC.ObjToString(v["area"]) == "A", "全国", MC.ObjToString(v["area"])).(string) bn.Buyerclass = MC.ObjToString(v["buyerclass"]) bn.City = MC.ObjToString(v["city"]) bn.Industry = MC.If(MC.ObjToString(v["s_subscopeclass"]) != "", strings.Split(strings.Split(MC.ObjToString(v["s_subscopeclass"]), ",")[0], "_")[0], "").(string) bn.SpiderCode = MC.ObjToString(v["spidercode"]) bn.Site = MC.ObjToString(v["site"]) } } } //mongodb bidding mgo_ids := []primitive.ObjectID{} for _, v := range es_ids { if infos[v].Title == "" { mgo_ids = append(mgo_ids, mongodb.StringTOBsonId(v)) } } if len(mgo_ids) > 0 { list, ok := IC.MgoBidding.Find("bidding", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_ids}}, nil, mongodb_fields, false, -1, -1) if ok && *list != nil { for _, v := range *list { _id := mongodb.BsonIdToSId(v["_id"]) bn := infos[_id] bn.Title = MC.ObjToString(v["title"]) bn.PublishTime = MC.Int64All(v["publishtime"]) bn.Subtype = MC.If(v["subtype"] != nil, MC.ObjToString(v["subtype"]), MC.ObjToString(v["toptype"])).(string) bn.Area = MC.If(MC.ObjToString(v["area"]) == "A", "全国", MC.ObjToString(v["area"])).(string) bn.Buyerclass = MC.ObjToString(v["buyerclass"]) bn.City = MC.ObjToString(v["city"]) bn.Industry = MC.If(MC.ObjToString(v["s_subscopeclass"]) != "", strings.Split(strings.Split(MC.ObjToString(v["s_subscopeclass"]), ",")[0], "_")[0], "").(string) bn.SpiderCode = MC.ObjToString(v["spidercode"]) bn.Site = MC.ObjToString(v["site"]) } } } //mongodb bidding_back mgo_back_ids := []primitive.ObjectID{} for _, v := range mgo_ids { if infos[mongodb.BsonIdToSId(v)].Title == "" { mgo_back_ids = append(mgo_back_ids, v) } } if len(mgo_back_ids) > 0 { list, ok := IC.MgoBidding.Find("bidding_back", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_back_ids}}, nil, mongodb_fields, false, -1, -1) if ok && *list != nil { for _, v := range *list { _id := mongodb.BsonIdToSId(v["_id"]) bn := infos[_id] bn.Title = MC.ObjToString(v["title"]) bn.PublishTime = MC.Int64All(v["publishtime"]) bn.Subtype = MC.If(v["subtype"] != nil, MC.ObjToString(v["subtype"]), MC.ObjToString(v["toptype"])).(string) bn.Area = MC.If(MC.ObjToString(v["area"]) == "A", "全国", MC.ObjToString(v["area"])).(string) bn.Buyerclass = MC.ObjToString(v["buyerclass"]) bn.City = MC.ObjToString(v["city"]) bn.Industry = MC.If(MC.ObjToString(v["s_subscopeclass"]) != "", strings.Split(strings.Split(MC.ObjToString(v["s_subscopeclass"]), ",")[0], "_")[0], "").(string) bn.SpiderCode = MC.ObjToString(v["spidercode"]) bn.Site = MC.ObjToString(v["site"]) } } } // for _, v := range infos { res = append(res, v) } } return } // 根据定位或者搜索历史 查es func NewestQuery(city, keys, subtype string) (str string) { var musts, bools []string if keys != "" { for _, v := range strings.Split(keys, ",") { keys := strings.Split(v, " ") //历史搜索 空格划分 must_tmp := []string{} for _, key := range keys { must_tmp = append(must_tmp, fmt.Sprintf(multi_match, "\""+key+"\"")) } bools = append(bools, fmt.Sprintf(query_bool_must_and, strings.Join(must_tmp, ","), "")) } } if city != "" { musts = append(musts, fmt.Sprintf(query_bool_must, "city", `"`+city+`"`)) } //未登录首页推送数据限制 if subtype != "" { musts = append(musts, fmt.Sprintf(query_bool_must, "subtype", subtype)) } minimum_should_match := 0 if len(bools) > 0 { minimum_should_match = 1 } str = fmt.Sprintf(query_city_hkeys, strings.Join(musts, ","), strings.Join(bools, ","), minimum_should_match, IC.C.NewsLimitNum) logx.Info("str:", str) return } // es查询 func NewestES(doSearchStr string) (res []*bxbase.NewestList) { list := elastic.Get(search_index, search_type, doSearchStr) if list != nil && len(*list) > 0 { for _, v := range *list { _id := mongodb.BsonIdToSId(v["_id"]) isValidFile, _ := v["isValidFile"].(bool) res = append(res, &bxbase.NewestList{ Id: ME.EncodeArticleId2ByCheck(_id), Title: MC.ObjToString(v["title"]), Subtype: MC.If(v["subtype"] != nil, MC.ObjToString(v["subtype"]), MC.ObjToString(v["toptype"])).(string), Area: MC.If(MC.ObjToString(v["area"]) == "A", "全国", MC.ObjToString(v["area"])).(string), Buyerclass: MC.ObjToString(v["buyerclass"]), City: MC.ObjToString(v["city"]), Industry: MC.If(MC.ObjToString(v["s_subscopeclass"]) != "", strings.Split(strings.Split(MC.ObjToString(v["s_subscopeclass"]), ",")[0], "_")[0], "").(string), Budget: MC.Int64All(v["budget"]), Bidamount: MC.Int64All(v["bidamount"]), FileExists: isValidFile, //附件 PublishTime: MC.Int64All(v["publishtime"]), Site: MC.ObjToString(v["site"]), SpiderCode: MC.ObjToString(v["spidercode"]), }) } } return } // GetRedisKeyTimeout 获取缓存的key 和超时时间 func GetRedisKeyTimeout(status int, positionId int64) config.CacheConfig { var ( redisKey, cuKey string timeOut, cuTimeOut int ) switch status { case StatusNoLogin: redisKey = IC.C.NewsCache.NoLogin.Key redisKey = fmt.Sprintf(redisKey, time.Now().Year(), time.Now().Month(), time.Now().Day()) timeOut = IC.C.NewsCache.NoLogin.Timeout cuKey = fmt.Sprintf("%s_%s", redisKey, IC.C.NewsCache.NoLogin.CacheUpdateKey) cuTimeOut = IC.C.NewsCache.NoLogin.CacheUpdateTimeout case StatusLoginUser: redisKey = IC.C.NewsCache.LoginUser.Key // 登录用户使用的缓存key redisKey = fmt.Sprintf(redisKey, positionId, time.Now().Year(), time.Now().Month(), time.Now().Day()) timeOut = IC.C.NewsCache.LoginUser.Timeout cuKey = fmt.Sprintf("%s_%s", redisKey, IC.C.NewsCache.LoginUser.CacheUpdateKey) cuTimeOut = IC.C.NewsCache.LoginUser.CacheUpdateTimeout case StatusLogin: redisKey = IC.C.NewsCache.Login.Key // 登录用户使用的最新标讯缓存key redisKey = fmt.Sprintf(redisKey, time.Now().Year(), time.Now().Month(), time.Now().Day()) timeOut = IC.C.NewsCache.Login.Timeout cuKey = fmt.Sprintf("%s_%s", redisKey, IC.C.NewsCache.Login.CacheUpdateKey) cuTimeOut = IC.C.NewsCache.Login.CacheUpdateTimeout } return config.CacheConfig{ Key: redisKey, Timeout: timeOut, CacheUpdateKey: cuKey, CacheUpdateTimeout: cuTimeOut, } } // PutNewsCache 存缓存 func PutNewsCache(redisKey string, redisTimeout int, list []*bxbase.NewestList) { b, err := json.Marshal(list) if err != nil { log.Printf("保存缓存 序列化异常,data:%s,err:%s\n", list, err.Error()) return } if err = redis.PutBytes("new", redisKey, &b, redisTimeout); err != nil { log.Printf("保存缓存 redis 异常,key:%s,err:%s\n", redisKey, err.Error()) } } // GetNewsCache 取缓存 func GetNewsCache(cc config.CacheConfig) (list []*bxbase.NewestList, err error) { redisByte, err := redis.GetBytes("new", cc.Key) if err != nil || redisByte == nil || len(*redisByte) == 0 { return list, err } err = json.Unmarshal(*redisByte, &list) if err != nil { logx.Info(fmt.Sprintf("读取缓存 序列化异常,err:%s", err.Error())) return nil, err } return list, nil } type NewSet struct { Status int RedisKeyModel config.CacheConfig RedisStatus int Query string } // 排序 入缓存 // status : 0 -拿到的是缓存 不用再处理也不用存缓存 1-存到未登录用户 2-存到用户自己的key 3-存到登录用户最新标讯 func DataSortInRedis(r *bxbase.NewsetBiddingResp, status int, positionId int64) { //排序 sort.Slice(r.Data.List, func(i, j int) bool { return r.Data.List[i].PublishTime > r.Data.List[j].PublishTime }) redisKeyModel := GetRedisKeyTimeout(status, positionId) go PutNewsCache(redisKeyModel.Key, redisKeyModel.Timeout, r.Data.List) } // 延长缓存 // 获取最新数据 -- 延长缓存时间7天---待调整: 1、判断是否需要更新;2、判断订阅信息是否存在,如果存在是否是最新推送,3、订阅信息查推送缓存(只有个人版有缓存:pushcache_2_a ) func ExtendNewListCache(n *NewSet, in *bxbase.NewestBiddingReq, list []*bxbase.NewestList) { if n.RedisKeyModel.Key != "" { now := time.Now() if n.RedisKeyModel.CacheUpdateKey != "" { var ( res = &bxbase.NewsetBiddingResp{ Data: &bxbase.NewsetBidding{ List: []*bxbase.NewestList{}, }, } updateTime = redis.GetInt("new", n.RedisKeyModel.CacheUpdateKey) ) switch n.RedisStatus { case StatusLoginUser: //十五分钟内 更新过一次 不再更新 if int(now.Unix())-updateTime > n.RedisKeyModel.CacheUpdateTimeout { // 登录用户 roleNewestInfo, _ := GetRoleNewestInfoService(in.AppId, in.MgoUserId, in.NewUserId, in.AccountId, in.EntId, in.EntUserId, in.PositionType, in.PositionId) //当前用户有最新推送信息 lastPublishTime := list[len(list)-1].PublishTime if roleNewestInfo.IsHasNewPushData(lastPublishTime) { // 查推送 subscribeTime := time.Now() res.Data.List = roleNewestInfo.GetPushHistory() log.Println(in.PositionId, "获取订阅数据 存缓存 耗时:", time.Since(subscribeTime).Seconds()) } else { res.Data.List = list } } default: //十五分钟内 更新过一次 不再更新 if int(now.Unix())-updateTime > n.RedisKeyModel.CacheUpdateTimeout { res.Data.List = NewestES(n.Query) } } if len(res.Data.List) > 0 { //更新update time redis.Put("new", n.RedisKeyModel.CacheUpdateKey, now.Unix(), n.RedisKeyModel.CacheUpdateTimeout) go DataSortInRedis(res, n.RedisStatus, in.PositionId) } } ////剩余时间 在 IC.C.NewsTimeOut 之内 //if ttl := redis.GetTTL("new", n.RedisKeyModel.Key); ttl-IC.C.NewsTimeOut < 0 { // //} } }