wcj 5 years ago
parent
commit
0ec2d42b0a
2 changed files with 309 additions and 8 deletions
  1. 301 0
      common/src/qfw/util/jy/entnichepush.go
  2. 8 8
      common/src/qfw/util/jy/subscribepush.go

+ 301 - 0
common/src/qfw/util/jy/entnichepush.go

@@ -0,0 +1,301 @@
+package jy
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	. "qfw/util"
+	"qfw/util/elastic"
+	mg "qfw/util/mongodb"
+	"qfw/util/mysql"
+	"qfw/util/redis"
+	"strings"
+	"time"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var EntnichePush = &entnichePush{}
+
+type entnichePush struct {
+}
+
+//从pushcache_2_a中取
+func (e *entnichePush) GetTodayCache(userId int) (*SubPush, error) {
+	pc_a, err := redis.GetNewBytes("pushcache_2_b", e.todayKey(userId))
+	if err != nil {
+		return nil, err
+	}
+	if pc_a == nil {
+		return nil, nil
+	}
+	var p *SubPush
+	if err := json.Unmarshal(*pc_a, &p); err != nil {
+		return nil, err
+	}
+	return p, nil
+}
+
+//往pushcache_2_a中放
+func (e *entnichePush) PutTodayCache(userId int, pc_a *SubPush) {
+	redis.Put("pushcache_2_b", e.todayKey(userId), pc_a, threeDay)
+}
+
+//获取redis key
+func (e *entnichePush) todayKey(userId int) string {
+	return fmt.Sprintf("entnichepush_%d", userId)
+}
+
+func (e *entnichePush) Datas(MQFW mg.MongodbSim, PushMysql *mysql.Mysql, userId int, pageNum int, selectTime, area string) (hasNextPage bool, result []*SubPushList) {
+	if userId <= 0 {
+		return
+	}
+	if pageNum < 1 {
+		pageNum = 1
+	}
+	now := NowFormat(Date_Short_Layout)
+	start := (pageNum - 1) * pageSize
+	end := start + pageSize
+	if now == selectTime && area == "" {
+		subPush, err := e.GetTodayCache(userId)
+		if err != nil {
+			log.Println(userId, "GetTodayCache Error", err)
+		}
+		if err != nil || subPush == nil || subPush.Date != now || len(subPush.Datas) == 0 {
+			list := e.getDatasFromMysql(MQFW, PushMysql, userId, pageNum, pageSize, selectTime, area, false)
+			subPush = &SubPush{
+				Date:  now,
+				Datas: list,
+			}
+			e.PutTodayCache(userId, subPush)
+		}
+		length := len(subPush.Datas)
+		if end > length {
+			end = length
+		}
+		if start < length {
+			result = subPush.Datas[start:end]
+		}
+	} else if selectTime == "" && area == "" && pageNum <= 5 {
+		allCache, err := e.GetAllCache(userId)
+		if err != nil {
+			log.Println(userId, "GetAllCache Error", err)
+		}
+		if err != nil || allCache == nil || len(allCache) == 0 {
+			allCache = e.getDatasFromMysql(MQFW, PushMysql, userId, 1, AllSubPushCacheSize, selectTime, area, true)
+			e.PutAllCache(userId, allCache)
+		}
+		length := len(allCache)
+		if end > length {
+			end = length
+		}
+		if start < length {
+			result = allCache[start:end]
+		}
+	} else {
+		result = e.getDatasFromMysql(MQFW, PushMysql, userId, pageNum, pageSize, selectTime, area, true)
+	}
+	if result == nil {
+		result = []*SubPushList{}
+	}
+	hasNextPage = len(result) >= pageSize
+	return
+}
+func (e *entnichePush) getDatasFromMysql(MQFW mg.MongodbSim, PushMysql *mysql.Mysql, userId int, pageNum, myPageSize int, selectTime, area string, isLimit bool) (result []*SubPushList) {
+	findSQL := "select id,date,infoid,isvisit,matchkeys,type,1 as isvip from pushsubscribe where userid = " + fmt.Sprint(userId)
+	findStr := ""
+	if selectTime != "" {
+		startTime := selectTime + " 00:00:00"
+		endTime := selectTime + " 23:59:59"
+		st, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, time.Local)
+		et, _ := time.ParseInLocation("2006-01-02 15:04:05", endTime, time.Local)
+		findStr += " and date < " + fmt.Sprint(et.Unix()) + " and date >= " + fmt.Sprint(st.Unix())
+	}
+	if area != "" {
+		findStr += " and city in ("
+		var _area = ""
+		for _, v := range strings.Split(area, ",") {
+			if v == "全部" {
+				continue
+			}
+			if _area != "" {
+				_area += ","
+			}
+			_area += fmt.Sprint(PushMapping.City[v])
+		}
+		findStr += _area + ")"
+	}
+	start := (pageNum - 1) * myPageSize
+	findStr += " order by id desc"
+	if isLimit {
+		findStr += " limit " + fmt.Sprint(start) + "," + fmt.Sprint(myPageSize)
+	}
+	findSQL = findSQL + findStr
+	//	log.Println("findsql:", findSQL)
+	list := PushMysql.SelectBySql(findSQL)
+	if len(*list) > 0 {
+		pushCas := e.GetJyPushs(*list)
+		result = e.GetInfoByIds(MQFW, pushCas)
+	} else {
+		result = []*SubPushList{}
+	}
+	return
+}
+
+//获取历史推送
+func (e *entnichePush) GetJyPushs(datas []map[string]interface{}) (pushCas []*PushCa) {
+	pushCas = []*PushCa{}
+	for _, v := range datas {
+		pushCas = append(pushCas, &PushCa{
+			Date:   Int64All(v["date"]),
+			InfoId: ObjToString(v["infoid"]),
+			Visit:  IntAll(v["isvisit"]),
+			Index:  Int64All(v["id"]),
+			Keys:   strings.Split(ObjToString(v["matchkeys"]), " "),
+			Type:   IntAll(v["type"]),
+			Isvip:  IntAll(v["isvip"]),
+		})
+	}
+	return
+}
+
+//根据id取内容
+func (e *entnichePush) GetInfoByIds(MQFW mg.MongodbSim, pushCas []*PushCa) []*SubPushList {
+	array := make([]*SubPushList, len(pushCas))
+	if len(pushCas) == 0 {
+		return array
+	}
+	m := map[string]bool{}
+	ids := []string{}
+	for _, v := range pushCas {
+		if m[v.InfoId] {
+			continue
+		}
+		m[v.InfoId] = true
+		ids = append(ids, v.InfoId)
+	}
+	infos := map[string]map[string]interface{}{}
+	//redis
+
+	es_ids := []string{}
+	for _, v := range ids {
+		info_i := redis.Get("pushcache_1", fmt.Sprintf("info_%d", v))
+		if info_i != nil {
+			info_m, _ := info_i.(map[string]interface{})
+			info_m["_id"] = v
+			infos[v] = info_m
+		} else {
+			es_ids = append(es_ids, v)
+		}
+	}
+	//	log.Println(es_ids)
+	//elasticsearch
+	if len(es_ids) > 0 {
+		list := elastic.Get("bidding", "bidding", fmt.Sprintf(query, strings.Join(es_ids, `","`), len(es_ids)))
+		if list != nil {
+			for _, v := range *list {
+				_id := ObjToString(v["_id"])
+				infos[_id] = v
+			}
+		}
+	}
+	//mongodb bidding
+	mgo_ids := []bson.ObjectId{}
+	for _, v := range es_ids {
+		if infos[v] == nil {
+			mgo_ids = append(mgo_ids, bson.ObjectIdHex(v))
+		}
+	}
+	if len(mgo_ids) > 0 {
+		list, ok := MQFW.Find("bidding", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_ids}}, nil, mongodb_fields, false, -1, -1)
+		if ok && *list != nil {
+			for _, v := range *list {
+				_id := BsonIdToSId(v["_id"])
+				v["_id"] = _id
+				infos[_id] = v
+			}
+		}
+	}
+	//mongodb bidding_back
+	mgo_back_ids := []bson.ObjectId{}
+	for _, v := range mgo_ids {
+		if infos[BsonIdToSId(v)] == nil {
+			mgo_back_ids = append(mgo_back_ids, v)
+		}
+	}
+	if len(mgo_back_ids) > 0 {
+		list, ok := MQFW.Find("bidding_back", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_back_ids}}, nil, mongodb_fields, false, -1, -1)
+		if ok && *list != nil {
+			for _, v := range *list {
+				_id := BsonIdToSId(v["_id"])
+				v["_id"] = _id
+				infos[_id] = v
+			}
+		}
+	}
+	//
+	for k, v := range pushCas {
+		info := infos[v.InfoId]
+		if info == nil {
+			info = map[string]interface{}{}
+		}
+		array[k] = SubscribePush.InfoFormat(v, &info)
+	}
+	return array
+}
+
+func (e *entnichePush) Visit(PushMysql *mysql.Mysql, userId, id string) {
+	if id == "" || userId == "" {
+		return
+	}
+	PushMysql.UpdateOrDeleteBySql("update pushsubscribe set isvisit=1 where userid='" + userId + "' and id=" + id)
+	todaySubPush, err := SubscribePush.GetTodayCache(userId)
+	if err == nil && todaySubPush != nil {
+		for _, v := range todaySubPush.Datas {
+			if v.Ca_index == Int64All(id) {
+				v.Ca_isvisit = 1
+				break
+			}
+		}
+		SubscribePush.PutTodayCache(userId, todaySubPush)
+	}
+	//
+	allSubPush, err := SubscribePush.GetAllCache(userId)
+	if err == nil && allSubPush != nil {
+		for _, v := range allSubPush {
+			if v.Ca_index == Int64All(id) {
+				v.Ca_isvisit = 1
+				break
+			}
+		}
+		SubscribePush.PutAllCache(userId, allSubPush)
+	}
+}
+
+//查看全部列表缓存
+func (e *entnichePush) allKey(userId int) string {
+	return fmt.Sprintf("all_entnichepush_%d", userId)
+}
+
+func (e *entnichePush) PutAllCache(userId int, datas []*SubPushList) {
+	redis.Put("pushcache_2_a", e.allKey(userId), datas, threeDay)
+}
+
+func (e *entnichePush) GetAllCache(userId int) ([]*SubPushList, error) {
+	return e.GetCache("pushcache_2_a", e.allKey(userId))
+}
+
+func (e *entnichePush) GetCache(code, key string) ([]*SubPushList, error) {
+	pc_a, err := redis.GetNewBytes(code, key)
+	if err != nil {
+		return nil, err
+	}
+	if pc_a == nil {
+		return nil, nil
+	}
+	var p []*SubPushList
+	if err := json.Unmarshal(*pc_a, &p); err != nil {
+		return nil, err
+	}
+	return p, nil
+}

+ 8 - 8
common/src/qfw/util/jy/subscribepush.go

@@ -154,7 +154,7 @@ func (s *subscribePush) Datas(MQFW mg.MongodbSim, PushMysql *mysql.Mysql, userId
 	start := (pageNum - 1) * pageSize
 	end := start + pageSize
 	if now == selectTime && area == "" {
-		subPush, err := SubscribePush.GetTodayCache(userId)
+		subPush, err := s.GetTodayCache(userId)
 		if err != nil {
 			log.Println(userId, "GetTodayCache Error", err)
 		}
@@ -164,7 +164,7 @@ func (s *subscribePush) Datas(MQFW mg.MongodbSim, PushMysql *mysql.Mysql, userId
 				Date:  now,
 				Datas: list,
 			}
-			SubscribePush.PutTodayCache(userId, subPush)
+			s.PutTodayCache(userId, subPush)
 		}
 		length := len(subPush.Datas)
 		if end > length {
@@ -174,13 +174,13 @@ func (s *subscribePush) Datas(MQFW mg.MongodbSim, PushMysql *mysql.Mysql, userId
 			result = subPush.Datas[start:end]
 		}
 	} else if selectTime == "" && area == "" && pageNum <= 5 {
-		allCache, err := SubscribePush.GetAllCache(userId)
+		allCache, err := s.GetAllCache(userId)
 		if err != nil {
 			log.Println(userId, "GetAllCache Error", err)
 		}
 		if err != nil || allCache == nil || len(allCache) == 0 {
 			allCache = s.getDatasFromMysql(MQFW, PushMysql, userId, 1, AllSubPushCacheSize, selectTime, area, true)
-			SubscribePush.PutAllCache(userId, allCache)
+			s.PutAllCache(userId, allCache)
 		}
 		length := len(allCache)
 		if end > length {
@@ -441,7 +441,7 @@ func (s *subscribePush) Visit(PushMysql *mysql.Mysql, userId, id string) {
 		return
 	}
 	PushMysql.UpdateOrDeleteBySql("update pushsubscribe set isvisit=1 where userid='" + userId + "' and id=" + id)
-	todaySubPush, err := SubscribePush.GetTodayCache(userId)
+	todaySubPush, err := s.GetTodayCache(userId)
 	if err == nil && todaySubPush != nil {
 		for _, v := range todaySubPush.Datas {
 			if v.Ca_index == Int64All(id) {
@@ -449,10 +449,10 @@ func (s *subscribePush) Visit(PushMysql *mysql.Mysql, userId, id string) {
 				break
 			}
 		}
-		SubscribePush.PutTodayCache(userId, todaySubPush)
+		s.PutTodayCache(userId, todaySubPush)
 	}
 	//
-	allSubPush, err := SubscribePush.GetAllCache(userId)
+	allSubPush, err := s.GetAllCache(userId)
 	if err == nil && allSubPush != nil {
 		for _, v := range allSubPush {
 			if v.Ca_index == Int64All(id) {
@@ -460,7 +460,7 @@ func (s *subscribePush) Visit(PushMysql *mysql.Mysql, userId, id string) {
 				break
 			}
 		}
-		SubscribePush.PutAllCache(userId, allSubPush)
+		s.PutAllCache(userId, allSubPush)
 	}
 }