|
@@ -6,7 +6,7 @@ import (
|
|
"fmt"
|
|
"fmt"
|
|
"log"
|
|
"log"
|
|
"qfw/util"
|
|
"qfw/util"
|
|
- "qfw/util/elastic"
|
|
|
|
|
|
+ elastic "qfw/util/elastic_v5"
|
|
qrpc "qfw/util/rpc"
|
|
qrpc "qfw/util/rpc"
|
|
"sort"
|
|
"sort"
|
|
"strconv"
|
|
"strconv"
|
|
@@ -20,14 +20,33 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
- MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
|
- Query = `{ "query": { "bool": { "must": [%s], "should": [%s], "minimum_should_match": 1 } }, "_source": [ "_id","title","publishtime","area","type","toptype","subtype","projectname","projectcode","href","infoformat" ], "sort": [ { "publishtime": "desc" } ], "from": 0, "size": 50 }`
|
|
|
|
- IDRange = `{"range":{"id":{"gt":"%s","lte":"%s"}}}`
|
|
|
|
- TERM = `{"term":{"%s":"%s"}}`
|
|
|
|
- DB = "bidding"
|
|
|
|
- KEEPCOUNT = 100
|
|
|
|
|
|
+ MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
|
+ Query = `{ "query": { "bool": { "must": [%s], "should": [%s], "minimum_should_match": 1 } }, "_source": [ %s ], "sort": [ %s ], "from": 0, "size": 50 }`
|
|
|
|
+ FilterQuery = `{"query": {"filtered": {"filter": {"bool": {"must": [%s]}}}}}`
|
|
|
|
+ IDRange = `{"range":{"id":{"gt":"%s","lte":"%s"}}}`
|
|
|
|
+ TERM = `{"term":{"%s":"%s"}}`
|
|
|
|
+ SortQuery = `{"publishtime":"desc"}`
|
|
|
|
+ ShowField = `"_id","title","publishtime","area","type","toptype","subtype","projectname","projectcode","href","infoformat"`
|
|
|
|
+ DB = "bidding"
|
|
|
|
+ KEEPCOUNT = 100
|
|
|
|
+ MaxSearch = 10000 //缓存中总共加载这么多条
|
|
|
|
+ OnceMax = 400 //ES一次查询这么多条
|
|
|
|
+ searchpool = make(chan bool, 8)
|
|
|
|
+ pushpool = make(chan bool, 50)
|
|
|
|
+ findpool = make(chan bool, 10)
|
|
|
|
+ eachpool = make(chan bool, 100)
|
|
|
|
+ Pushlock = sync.Mutex{}
|
|
|
|
+ searchWaitGroup = &sync.WaitGroup{}
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+type PushUser struct {
|
|
|
|
+ Id interface{}
|
|
|
|
+ OpenId string
|
|
|
|
+ ProjectCode string
|
|
|
|
+ ProjectName string
|
|
|
|
+ Title string
|
|
|
|
+}
|
|
|
|
+
|
|
//开始推送
|
|
//开始推送
|
|
func pushByEs(_id string) bool {
|
|
func pushByEs(_id string) bool {
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
@@ -40,7 +59,6 @@ func pushByEs(_id string) bool {
|
|
log.Println("未查找到数据...", fmt.Sprintf(MaxId, _id))
|
|
log.Println("未查找到数据...", fmt.Sprintf(MaxId, _id))
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
- idrange := fmt.Sprintf(IDRange, _id, lastid)
|
|
|
|
sess := tools.MQFW.GetMgoConn()
|
|
sess := tools.MQFW.GetMgoConn()
|
|
defer tools.MQFW.DestoryMongoConn(sess)
|
|
defer tools.MQFW.DestoryMongoConn(sess)
|
|
cur := sess.DB(tools.MQFW.DbName).C(FOLLOW_COLLECTION).Find(&map[string]interface{}{
|
|
cur := sess.DB(tools.MQFW.DbName).C(FOLLOW_COLLECTION).Find(&map[string]interface{}{
|
|
@@ -58,24 +76,129 @@ func pushByEs(_id string) bool {
|
|
"_id": 1,
|
|
"_id": 1,
|
|
}).Iter()
|
|
}).Iter()
|
|
j := 0
|
|
j := 0
|
|
|
|
+ users := []*PushUser{}
|
|
for tmp := make(map[string]interface{}); cur.Next(tmp); j++ {
|
|
for tmp := make(map[string]interface{}); cur.Next(tmp); j++ {
|
|
- util.Try(func() {
|
|
|
|
- scode := util.ObjToString(tmp["s_projectcode"])
|
|
|
|
- sname := util.ObjToString(tmp["s_projectname"])
|
|
|
|
- openid := util.ObjToString(tmp["s_openid"])
|
|
|
|
- go FindData(tmp["_id"], util.ObjToString(tmp["s_title"]), sname, scode, openid, idrange, true, true)
|
|
|
|
- }, func(e interface{}) {
|
|
|
|
- log.Println(e)
|
|
|
|
|
|
+ scode := util.ObjToString(tmp["s_projectcode"])
|
|
|
|
+ sname := util.ObjToString(tmp["s_projectname"])
|
|
|
|
+ openid := util.ObjToString(tmp["s_openid"])
|
|
|
|
+ if strings.TrimSpace(openid) == "" || (strings.TrimSpace(scode) == "" && strings.TrimSpace(sname) == "") {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ users = append(users, &PushUser{
|
|
|
|
+ OpenId: openid,
|
|
|
|
+ Id: tmp["_id"],
|
|
|
|
+ Title: util.ObjToString(tmp["s_title"]),
|
|
|
|
+ ProjectCode: scode,
|
|
|
|
+ ProjectName: sname,
|
|
})
|
|
})
|
|
tmp = make(map[string]interface{})
|
|
tmp = make(map[string]interface{})
|
|
}
|
|
}
|
|
|
|
+ if len(users) == 0 {
|
|
|
|
+ log.Println("未查找到需要推送的用户...")
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ initFlag, res := InitBiddingCache(_id, lastid)
|
|
|
|
+ if !initFlag {
|
|
|
|
+ log.Println("加载数据到内存中的时候,未查找到数据...")
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ //遍历数据,挂到用户身上
|
|
|
|
+ userMap := EachAllBidInfo(&users, res)
|
|
|
|
+ //遍历用户,推送
|
|
|
|
+ for k, v := range *userMap {
|
|
|
|
+ pushpool <- true
|
|
|
|
+ func(tmp *[]map[string]interface{}) {
|
|
|
|
+ defer func() {
|
|
|
|
+ <-pushpool
|
|
|
|
+ }()
|
|
|
|
+ push(k.Id, k.ProjectName, k.ProjectCode, k.Title, k.OpenId, v, true, true)
|
|
|
|
+ }(v)
|
|
|
|
+ }
|
|
log.Println("push-over,user-count:", j)
|
|
log.Println("push-over,user-count:", j)
|
|
Sysconfig["lastid"] = lastid
|
|
Sysconfig["lastid"] = lastid
|
|
return true
|
|
return true
|
|
}
|
|
}
|
|
|
|
|
|
-var findpool = make(chan bool, 10)
|
|
|
|
-var Pushlock = sync.Mutex{}
|
|
|
|
|
|
+//遍历数据并执行推送操作
|
|
|
|
+func EachAllBidInfo(users *[]*PushUser, res *[]map[string]interface{}) *map[*PushUser]*[]map[string]interface{} {
|
|
|
|
+ userMap := map[*PushUser]*[]map[string]interface{}{}
|
|
|
|
+ for _, v := range *res {
|
|
|
|
+ eachpool <- true
|
|
|
|
+ func(tmp map[string]interface{}) {
|
|
|
|
+ defer func() {
|
|
|
|
+ <-eachpool
|
|
|
|
+ }()
|
|
|
|
+ scode := util.ObjToString(tmp["s_projectcode"])
|
|
|
|
+ sname := util.ObjToString(tmp["s_projectname"])
|
|
|
|
+ for _, user := range *users {
|
|
|
|
+ if scode != user.ProjectCode && sname != user.ProjectName {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ s := userMap[user]
|
|
|
|
+ if s == nil {
|
|
|
|
+ s = &[]map[string]interface{}{}
|
|
|
|
+ userMap[user] = s
|
|
|
|
+ }
|
|
|
|
+ *s = append(*s, tmp)
|
|
|
|
+ }
|
|
|
|
+ }(v)
|
|
|
|
+ }
|
|
|
|
+ return &userMap
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//加载数据到内存中
|
|
|
|
+func InitBiddingCache(_id, lastid string) (bool, *[]map[string]interface{}) {
|
|
|
|
+ //filter全局查询
|
|
|
|
+ c_query := fmt.Sprintf(FilterQuery, fmt.Sprintf(IDRange, _id, lastid))
|
|
|
|
+ log.Println("es query:", c_query)
|
|
|
|
+ //testquery := `{"terms":{"_id":["59cefaa12cf26913ca07fd56"]}}`
|
|
|
|
+ //testquery := ``
|
|
|
|
+ //c_query = fmt.Sprintf(FilterQuery, testquery)
|
|
|
|
+ var res []map[string]interface{}
|
|
|
|
+ count := int(elastic.Count(DB, DB, c_query))
|
|
|
|
+ log.Println("本次推送共查到数据", count, "条")
|
|
|
|
+ if count == 0 {
|
|
|
|
+ return false, &res
|
|
|
|
+ }
|
|
|
|
+ if count > MaxSearch {
|
|
|
|
+ count = MaxSearch
|
|
|
|
+ log.Println("目前数据多于", MaxSearch, ",只加载了", MaxSearch, "条!")
|
|
|
|
+ }
|
|
|
|
+ if OnceMax > count {
|
|
|
|
+ OnceMax = count
|
|
|
|
+ }
|
|
|
|
+ totalPage := int((count + OnceMax - 1) / OnceMax)
|
|
|
|
+ log.Println("数据一共", totalPage, "页!")
|
|
|
|
+ //如果res长度和cout相差5条,重试
|
|
|
|
+ for t := 1; t <= 3; t++ {
|
|
|
|
+ res = []map[string]interface{}{}
|
|
|
|
+ for i := 0; i < totalPage; i++ {
|
|
|
|
+ searchpool <- true
|
|
|
|
+ searchWaitGroup.Add(1)
|
|
|
|
+ go func(start int) {
|
|
|
|
+ defer func() {
|
|
|
|
+ searchWaitGroup.Done()
|
|
|
|
+ <-searchpool
|
|
|
|
+ }()
|
|
|
|
+ r := elastic.GetAllByNgram(DB, DB, c_query, "", SortQuery, ShowField, start*OnceMax, OnceMax, 0, false)
|
|
|
|
+ res = append(res, *r...)
|
|
|
|
+ log.Println("第", start+1, "页数据加载完成!")
|
|
|
|
+ }(i)
|
|
|
|
+ }
|
|
|
|
+ searchWaitGroup.Wait()
|
|
|
|
+ if len(res) >= count-5 {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ log.Println("第", t, "次加载数据完成,数据总数", len(res), ",由于数据量不够,重新加载!")
|
|
|
|
+ }
|
|
|
|
+ resLenght := len(res)
|
|
|
|
+ if resLenght == 0 {
|
|
|
|
+ return false, &res
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.Println(resLenght, "条数据已经加载完成!")
|
|
|
|
+ return true, &res
|
|
|
|
+}
|
|
|
|
|
|
//不保存不推送-只保存不推送-保存推送
|
|
//不保存不推送-只保存不推送-保存推送
|
|
func FindData(fid interface{}, title, sname, scode, openid, idrange string, bsave, bpush bool) *Arr {
|
|
func FindData(fid interface{}, title, sname, scode, openid, idrange string, bsave, bpush bool) *Arr {
|
|
@@ -92,258 +215,137 @@ func FindData(fid interface{}, title, sname, scode, openid, idrange string, bsav
|
|
}
|
|
}
|
|
var pushArray = &Arr{}
|
|
var pushArray = &Arr{}
|
|
if len(q1) > 0 {
|
|
if len(q1) > 0 {
|
|
- res := elastic.Get(DB, DB, fmt.Sprintf(Query, idrange, strings.Join(q1, ",")))
|
|
|
|
|
|
+ res := elastic.Get(DB, DB, fmt.Sprintf(Query, idrange, strings.Join(q1, ","), ShowField, SortQuery))
|
|
if res != nil && *res != nil && len(*res) > 0 {
|
|
if res != nil && *res != nil && len(*res) > 0 {
|
|
//顺序处理,后序会有性能瓶颈,filterdata
|
|
//顺序处理,后序会有性能瓶颈,filterdata
|
|
- util.Try(func() {
|
|
|
|
- if !bsave {
|
|
|
|
- for _, info := range *res {
|
|
|
|
- tmp := map[string]interface{}{}
|
|
|
|
- sid := util.BsonIdToSId(info["_id"])
|
|
|
|
- if title != sid { //title在此处传的是关注信息id
|
|
|
|
- tmp["s_id"] = sid
|
|
|
|
- tmp["s_eid"] = util.EncodeArticleId2ByCheck(sid)
|
|
|
|
- tmp["s_title"] = info["title"]
|
|
|
|
- tmp["l_publishtime"] = info["publishtime"]
|
|
|
|
- tmp["s_province"] = info["area"]
|
|
|
|
- tmp["s_type"] = util.ObjToString(info["type"])
|
|
|
|
- tmp["s_toptype"] = util.ObjToString(info["toptype"])
|
|
|
|
- tmp["s_subtype"] = util.ObjToString(info["subtype"])
|
|
|
|
- tmp["s_projectname"] = util.ObjToString(info["projectname"])
|
|
|
|
- tmp["s_projectcode"] = util.ObjToString(info["projectcode"])
|
|
|
|
- tmp["s_url"] = util.ObjToString(info["href"])
|
|
|
|
- *pushArray = append(*pushArray, &tmp)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- Pushlock.Lock()
|
|
|
|
- defer Pushlock.Unlock()
|
|
|
|
- filterData := &filterdata.FilterData{}
|
|
|
|
- filterData.Start(openid)
|
|
|
|
- defer filterData.End()
|
|
|
|
- //1.组织信息、
|
|
|
|
- //a_relationinfo s_id s_title s_projectname s_projectcode l_publishtime s_url
|
|
|
|
- var ids []string
|
|
|
|
- for _, info := range *res {
|
|
|
|
- sid := util.BsonIdToSId(info["_id"])
|
|
|
|
- if title == sid && !bpush {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- tmp := map[string]interface{}{}
|
|
|
|
- tmp["s_id"] = sid
|
|
|
|
- tmp["s_eid"] = util.EncodeArticleId2ByCheck(sid)
|
|
|
|
- tmp["s_title"] = info["title"]
|
|
|
|
- tmp["l_publishtime"] = info["publishtime"]
|
|
|
|
- tmp["s_province"] = info["area"]
|
|
|
|
- tmp["s_type"] = util.ObjToString(info["type"])
|
|
|
|
- tmp["s_toptype"] = util.ObjToString(info["toptype"])
|
|
|
|
- tmp["s_subtype"] = util.ObjToString(info["subtype"])
|
|
|
|
- tmp["s_projectname"] = util.ObjToString(info["projectname"])
|
|
|
|
- tmp["s_projectcode"] = util.ObjToString(info["projectcode"])
|
|
|
|
- tmp["s_url"] = util.ObjToString(info["href"])
|
|
|
|
- if filterData.IsExists(sid) {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- ids = append(ids, sid)
|
|
|
|
- *pushArray = append(*pushArray, &tmp)
|
|
|
|
- }
|
|
|
|
- go func() {
|
|
|
|
- //2.推送、
|
|
|
|
- if pushArray.Len() > 0 {
|
|
|
|
- sort.Sort(pushArray)
|
|
|
|
- //更新用户关注的a_relationinfo、保存到推送记录表、推送给用户
|
|
|
|
- //log.Println(fid, ids)
|
|
|
|
- //项目公告保留包括本身在内的最新100条
|
|
|
|
- updateFlag := false
|
|
|
|
- if fid != nil {
|
|
|
|
- followObject, fok := tools.MQFW.FindOneByField(FOLLOW_COLLECTION, &bson.M{"_id": fid}, `{"l_publishtime":1,"a_relationinfo":1,"s_id":1}`)
|
|
|
|
- if fok && followObject != nil && len(*followObject) > 0 {
|
|
|
|
- relationinfo, _ := (*followObject)["a_relationinfo"].([]interface{})
|
|
|
|
- s_id, _ := (*followObject)["s_id"].(string)
|
|
|
|
- rLength := len(relationinfo)
|
|
|
|
- pLength := pushArray.Len()
|
|
|
|
- setMap := bson.M{"a_lastpushids": ids}
|
|
|
|
- if bpush {
|
|
|
|
- setMap["l_lastpushtime"] = (*((*pushArray)[0]))["l_publishtime"]
|
|
|
|
- } else {
|
|
|
|
- setMap["l_lastpushtime"] = (*followObject)["l_publishtime"]
|
|
|
|
- }
|
|
|
|
- updateMap := &bson.M{"$set": setMap}
|
|
|
|
- if rLength > 0 && rLength+pLength > KEEPCOUNT { //保留100条
|
|
|
|
- start := rLength + pLength - KEEPCOUNT
|
|
|
|
- firstInfo, _ := relationinfo[0].(map[string]interface{})
|
|
|
|
- firstInfoId, _ := firstInfo["s_id"].(string)
|
|
|
|
- var relationinfoTemp []interface{}
|
|
|
|
- //保留本身
|
|
|
|
- if s_id == firstInfoId && !bpush {
|
|
|
|
- relationinfoTemp = relationinfo[:1]
|
|
|
|
- if start+1 < rLength {
|
|
|
|
- relationinfoTemp = append(relationinfoTemp, relationinfo[start+1:]...)
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- if start < rLength {
|
|
|
|
- relationinfoTemp = append(relationinfoTemp, relationinfo[start:]...)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- for _, v := range *pushArray {
|
|
|
|
- relationinfoTemp = append(relationinfoTemp, v)
|
|
|
|
- }
|
|
|
|
- setMap["a_relationinfo"] = relationinfoTemp
|
|
|
|
- } else { //追加
|
|
|
|
- (*updateMap)["$pushAll"] = bson.M{"a_relationinfo": pushArray}
|
|
|
|
- }
|
|
|
|
- updateFlag = tools.MQFW.Update(FOLLOW_COLLECTION, &bson.M{"_id": fid}, updateMap, false, false)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if updateFlag && bpush {
|
|
|
|
- //进入推送逻辑
|
|
|
|
- tit := sname
|
|
|
|
- if tit == "" {
|
|
|
|
- tit = title
|
|
|
|
- }
|
|
|
|
- if tit == "" {
|
|
|
|
- tit = scode
|
|
|
|
- }
|
|
|
|
- if tit != "" {
|
|
|
|
- //go func() {
|
|
|
|
- followid := util.BsonIdToSId(fid)
|
|
|
|
- infoid := tools.MQFW.Save(FOLLOW_PUSH_LOG, &bson.M{
|
|
|
|
- "s_openid": openid,
|
|
|
|
- "a_relationinfo": pushArray,
|
|
|
|
- "l_date": time.Now().Unix(),
|
|
|
|
- "s_title": title,
|
|
|
|
- "s_projectcode": scode,
|
|
|
|
- "s_projectname": sname,
|
|
|
|
- "s_followid": followid,
|
|
|
|
- })
|
|
|
|
- if infoid != "" {
|
|
|
|
- lastTime := util.Int64All((*(*pushArray)[0])["l_publishtime"])
|
|
|
|
- pushtt := fmt.Sprintf(WxTitle, tit)
|
|
|
|
- Tip1 := ""
|
|
|
|
- minute := time.Now().Unix() - lastTime
|
|
|
|
- if minute > -1 && minute < 61 {
|
|
|
|
- Tip1 = fmt.Sprintf("%d秒前发布的:\n", minute)
|
|
|
|
- } else {
|
|
|
|
- minute = minute / 60
|
|
|
|
- if minute < 121 {
|
|
|
|
- if minute < 1 {
|
|
|
|
- minute = 1
|
|
|
|
- }
|
|
|
|
- Tip1 = fmt.Sprintf("%d分钟前发布的:\n", minute)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- LastTip := ""
|
|
|
|
- pushnum := len(*pushArray)
|
|
|
|
- if pushnum > 1 {
|
|
|
|
- LastTip = fmt.Sprintf("...(共%d条)", pushnum)
|
|
|
|
- }
|
|
|
|
- LastLen = LastLen - len([]rune(pushtt)) - len([]rune(Tip1))
|
|
|
|
- Remark := ""
|
|
|
|
- bshow := false
|
|
|
|
- for n := 1; n < pushnum+1; n++ {
|
|
|
|
- Remark += fmt.Sprintf("%d %s\n", n, (*(*pushArray)[n-1])["s_title"])
|
|
|
|
- if len([]rune(Remark)) > LastLen {
|
|
|
|
- if n == pushnum {
|
|
|
|
- bshow = true
|
|
|
|
- }
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if bshow {
|
|
|
|
- LastTip = ""
|
|
|
|
- }
|
|
|
|
- go log.Println("push", openid, pushnum, tit)
|
|
|
|
- weixinrpc.SendWinXin(&qrpc.NotifyMsg{
|
|
|
|
- Openid: openid,
|
|
|
|
- Title: fmt.Sprintf(WxTitle, tit),
|
|
|
|
- Remark: Tip1 + Remark + LastTip,
|
|
|
|
- Detail: WxContent,
|
|
|
|
- Service: WxGroup,
|
|
|
|
- Url: ViewDomain + "/front/sess/" + se.EncodeString(openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",wxpushfollowlist") + "__" + infoid + "__" + followid,
|
|
|
|
- })
|
|
|
|
- }
|
|
|
|
- //}()
|
|
|
|
- }
|
|
|
|
- //推送结束
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
- }
|
|
|
|
- }, func(e interface{}) {
|
|
|
|
- log.Println("给用户推送关注信息时出错:", e)
|
|
|
|
- })
|
|
|
|
|
|
+ pushArray = push(fid, sname, scode, title, openid, res, bsave, bpush)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return pushArray
|
|
return pushArray
|
|
}
|
|
}
|
|
|
|
|
|
-/**
|
|
|
|
//开始推送
|
|
//开始推送
|
|
-func push(muser *map[*map[string]interface{}]*[]*map[string]interface{}) {
|
|
|
|
- if muser != nil && len(*muser) > 0 {
|
|
|
|
- for userSet, infos := range *muser {
|
|
|
|
- openid := (*userSet)["s_openid"].(string)
|
|
|
|
- filterdata.FilterData.Start(openid)
|
|
|
|
- util.Try(func() {
|
|
|
|
- //1.组织信息、
|
|
|
|
- //a_relationinfo s_id s_title s_projectname s_projectcode l_publishtime s_url
|
|
|
|
- var pushArray = &Arr{}
|
|
|
|
- var ids []string
|
|
|
|
- if infos != nil && len(*infos) > 0 {
|
|
|
|
- for _, info := range *infos {
|
|
|
|
- tmp := map[string]interface{}{}
|
|
|
|
- sid := fmt.Sprintf("%x", string((*info)["_id"].(bson.ObjectId)))
|
|
|
|
- ids = append(ids, sid)
|
|
|
|
- tmp["s_id"] = sid
|
|
|
|
- tmp["s_eid"] = util.EncodeArticleId(sid)
|
|
|
|
- tmp["s_title"] = (*info)["title"]
|
|
|
|
- tmp["l_publishtime"] = (*info)["publishtime"]
|
|
|
|
- tmp["s_province"] = (*info)["area"]
|
|
|
|
- tmp["s_type"] = util.ObjToString((*info)["type"])
|
|
|
|
- tmp["s_toptype"] = util.ObjToString((*info)["toptype"])
|
|
|
|
- tmp["s_subtype"] = util.ObjToString((*info)["subtype"])
|
|
|
|
- tmp["s_projectname"] = util.ObjToString((*info)["projectname"])
|
|
|
|
- tmp["s_projectcode"] = util.ObjToString((*info)["projectcode"])
|
|
|
|
- tmp["s_url"] = util.ObjToString((*info)["href"])
|
|
|
|
- if filterdata.FilterData.IsExists(sid) {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- *pushArray = append(*pushArray, &tmp)
|
|
|
|
- }
|
|
|
|
|
|
+func push(fid interface{}, sname, scode, title, openid string, res *[]map[string]interface{}, bsave, bpush bool) *Arr {
|
|
|
|
+ var pushArray = &Arr{}
|
|
|
|
+ util.Try(func() {
|
|
|
|
+ if !bsave {
|
|
|
|
+ for _, info := range *res {
|
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
|
+ sid := util.BsonIdToSId(info["_id"])
|
|
|
|
+ if title != sid { //title在此处传的是关注信息id
|
|
|
|
+ tmp["s_id"] = sid
|
|
|
|
+ tmp["s_eid"] = util.EncodeArticleId2ByCheck(sid)
|
|
|
|
+ tmp["s_title"] = info["title"]
|
|
|
|
+ tmp["l_publishtime"] = info["publishtime"]
|
|
|
|
+ tmp["s_province"] = info["area"]
|
|
|
|
+ tmp["s_type"] = util.ObjToString(info["type"])
|
|
|
|
+ tmp["s_toptype"] = util.ObjToString(info["toptype"])
|
|
|
|
+ tmp["s_subtype"] = util.ObjToString(info["subtype"])
|
|
|
|
+ tmp["s_projectname"] = util.ObjToString(info["projectname"])
|
|
|
|
+ tmp["s_projectcode"] = util.ObjToString(info["projectcode"])
|
|
|
|
+ tmp["s_url"] = util.ObjToString(info["href"])
|
|
|
|
+ *pushArray = append(*pushArray, &tmp)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ Pushlock.Lock()
|
|
|
|
+ defer Pushlock.Unlock()
|
|
|
|
+ filterData := &filterdata.FilterData{}
|
|
|
|
+ filterData.Start(openid)
|
|
|
|
+ defer filterData.End()
|
|
|
|
+ //1.组织信息、
|
|
|
|
+ //a_relationinfo s_id s_title s_projectname s_projectcode l_publishtime s_url
|
|
|
|
+ var ids []string
|
|
|
|
+ for _, info := range *res {
|
|
|
|
+ sid := util.BsonIdToSId(info["_id"])
|
|
|
|
+ if title == sid && !bpush {
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
- filterdata.FilterData.End(0)
|
|
|
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
|
+ tmp["s_id"] = sid
|
|
|
|
+ tmp["s_eid"] = util.EncodeArticleId2ByCheck(sid)
|
|
|
|
+ tmp["s_title"] = info["title"]
|
|
|
|
+ tmp["l_publishtime"] = info["publishtime"]
|
|
|
|
+ tmp["s_province"] = info["area"]
|
|
|
|
+ tmp["s_type"] = util.ObjToString(info["type"])
|
|
|
|
+ tmp["s_toptype"] = util.ObjToString(info["toptype"])
|
|
|
|
+ tmp["s_subtype"] = util.ObjToString(info["subtype"])
|
|
|
|
+ tmp["s_projectname"] = util.ObjToString(info["projectname"])
|
|
|
|
+ tmp["s_projectcode"] = util.ObjToString(info["projectcode"])
|
|
|
|
+ tmp["s_url"] = util.ObjToString(info["href"])
|
|
|
|
+ if filterData.IsExists(sid) {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ ids = append(ids, sid)
|
|
|
|
+ *pushArray = append(*pushArray, &tmp)
|
|
|
|
+ }
|
|
|
|
+ go func() {
|
|
//2.推送、
|
|
//2.推送、
|
|
if pushArray.Len() > 0 {
|
|
if pushArray.Len() > 0 {
|
|
sort.Sort(pushArray)
|
|
sort.Sort(pushArray)
|
|
//更新用户关注的a_relationinfo、保存到推送记录表、推送给用户
|
|
//更新用户关注的a_relationinfo、保存到推送记录表、推送给用户
|
|
- if tools.MQFW.Update(FOLLOW_COLLECTION, &bson.M{
|
|
|
|
- "_id": (*userSet)["_id"],
|
|
|
|
- }, &bson.M{
|
|
|
|
- "$set": bson.M{
|
|
|
|
- "l_lastpushtime": (*((*pushArray)[0]))["l_publishtime"],
|
|
|
|
- "a_lastpushids": ids,
|
|
|
|
- },
|
|
|
|
- "$pushAll": bson.M{
|
|
|
|
- "a_relationinfo": pushArray,
|
|
|
|
- },
|
|
|
|
- }, false, false) {
|
|
|
|
- tit := util.ObjToString((*userSet)["s_projectname"])
|
|
|
|
|
|
+ //log.Println(fid, ids)
|
|
|
|
+ //项目公告保留包括本身在内的最新100条
|
|
|
|
+ updateFlag := false
|
|
|
|
+ if fid != nil {
|
|
|
|
+ followObject, fok := tools.MQFW.FindOneByField(FOLLOW_COLLECTION, &bson.M{"_id": fid}, `{"l_publishtime":1,"a_relationinfo":1,"s_id":1}`)
|
|
|
|
+ if fok && followObject != nil && len(*followObject) > 0 {
|
|
|
|
+ relationinfo, _ := (*followObject)["a_relationinfo"].([]interface{})
|
|
|
|
+ s_id, _ := (*followObject)["s_id"].(string)
|
|
|
|
+ rLength := len(relationinfo)
|
|
|
|
+ pLength := pushArray.Len()
|
|
|
|
+ setMap := bson.M{"a_lastpushids": ids}
|
|
|
|
+ if bpush {
|
|
|
|
+ setMap["l_lastpushtime"] = (*((*pushArray)[0]))["l_publishtime"]
|
|
|
|
+ } else {
|
|
|
|
+ setMap["l_lastpushtime"] = (*followObject)["l_publishtime"]
|
|
|
|
+ }
|
|
|
|
+ updateMap := &bson.M{"$set": setMap}
|
|
|
|
+ if rLength > 0 && rLength+pLength > KEEPCOUNT { //保留100条
|
|
|
|
+ start := rLength + pLength - KEEPCOUNT
|
|
|
|
+ firstInfo, _ := relationinfo[0].(map[string]interface{})
|
|
|
|
+ firstInfoId, _ := firstInfo["s_id"].(string)
|
|
|
|
+ var relationinfoTemp []interface{}
|
|
|
|
+ //保留本身
|
|
|
|
+ if s_id == firstInfoId && !bpush {
|
|
|
|
+ relationinfoTemp = relationinfo[:1]
|
|
|
|
+ if start+1 < rLength {
|
|
|
|
+ relationinfoTemp = append(relationinfoTemp, relationinfo[start+1:]...)
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ if start < rLength {
|
|
|
|
+ relationinfoTemp = append(relationinfoTemp, relationinfo[start:]...)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ for _, v := range *pushArray {
|
|
|
|
+ relationinfoTemp = append(relationinfoTemp, v)
|
|
|
|
+ }
|
|
|
|
+ setMap["a_relationinfo"] = relationinfoTemp
|
|
|
|
+ } else { //追加
|
|
|
|
+ (*updateMap)["$pushAll"] = bson.M{"a_relationinfo": pushArray}
|
|
|
|
+ }
|
|
|
|
+ updateFlag = tools.MQFW.Update(FOLLOW_COLLECTION, &bson.M{"_id": fid}, updateMap, false, false)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if updateFlag && bpush {
|
|
|
|
+ //进入推送逻辑
|
|
|
|
+ tit := sname
|
|
if tit == "" {
|
|
if tit == "" {
|
|
- tit = util.ObjToString((*userSet)["s_title"])
|
|
|
|
|
|
+ tit = title
|
|
}
|
|
}
|
|
if tit == "" {
|
|
if tit == "" {
|
|
- tit = util.ObjToString((*userSet)["s_projectcode"])
|
|
|
|
|
|
+ tit = scode
|
|
}
|
|
}
|
|
if tit != "" {
|
|
if tit != "" {
|
|
//go func() {
|
|
//go func() {
|
|
- followid := fmt.Sprintf("%x", string((*userSet)["_id"].(bson.ObjectId)))
|
|
|
|
|
|
+ followid := util.BsonIdToSId(fid)
|
|
infoid := tools.MQFW.Save(FOLLOW_PUSH_LOG, &bson.M{
|
|
infoid := tools.MQFW.Save(FOLLOW_PUSH_LOG, &bson.M{
|
|
"s_openid": openid,
|
|
"s_openid": openid,
|
|
"a_relationinfo": pushArray,
|
|
"a_relationinfo": pushArray,
|
|
"l_date": time.Now().Unix(),
|
|
"l_date": time.Now().Unix(),
|
|
- "s_title": util.ObjToString((*userSet)["s_title"]),
|
|
|
|
- "s_projectcode": util.ObjToString((*userSet)["s_projectcode"]),
|
|
|
|
- "s_projectname": util.ObjToString((*userSet)["s_projectname"]),
|
|
|
|
|
|
+ "s_title": title,
|
|
|
|
+ "s_projectcode": scode,
|
|
|
|
+ "s_projectname": sname,
|
|
"s_followid": followid,
|
|
"s_followid": followid,
|
|
})
|
|
})
|
|
if infoid != "" {
|
|
if infoid != "" {
|
|
@@ -394,13 +396,13 @@ func push(muser *map[*map[string]interface{}]*[]*map[string]interface{}) {
|
|
}
|
|
}
|
|
//}()
|
|
//}()
|
|
}
|
|
}
|
|
|
|
+ //推送结束
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }, func(e interface{}) {
|
|
|
|
- filterdata.FilterData.End(0)
|
|
|
|
- log.Println("给用户推送关注信息时出错:", e)
|
|
|
|
- })
|
|
|
|
|
|
+ }()
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }, func(e interface{}) {
|
|
|
|
+ log.Println("给用户推送关注信息时出错:", e)
|
|
|
|
+ })
|
|
|
|
+ return pushArray
|
|
}
|
|
}
|
|
-**/
|
|
|