|
@@ -0,0 +1,233 @@
|
|
|
+package followpush
|
|
|
+
|
|
|
+import (
|
|
|
+ . "config"
|
|
|
+ "filterdata"
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ "qfw/util"
|
|
|
+ "qfw/util/elastic"
|
|
|
+ qrpc "qfw/util/rpc"
|
|
|
+ "sort"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+ "tools"
|
|
|
+ "weixinrpc"
|
|
|
+
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
+ Query = `{ "query": { "bool": { "must": [%s], "should": [%s], "minimum_should_match": 1 } }, "_source": [ "_id","title","publishtime","area","type","toptype","subtype","projectname","projectcode","href","infoformat" ], "sort": [ { "publishtime": "desc" } ], "from": 0, "size": 50 }`
|
|
|
+ IDRange = `{"range":{"id":{"gt":"%s","lte":"%s"}}}`
|
|
|
+ TERM = `{"term":{"%s":"%s"}}`
|
|
|
+ DB = "bidding"
|
|
|
+)
|
|
|
+
|
|
|
+//开始推送
|
|
|
+func pushByEs(_id string) bool {
|
|
|
+ defer util.Catch()
|
|
|
+ resId := elastic.Get(DB, DB, fmt.Sprintf(MaxId, _id))
|
|
|
+ lastid := ""
|
|
|
+ log.Println("push-lastid", _id, resId)
|
|
|
+ if resId != nil && *resId != nil && len(*resId) == 1 {
|
|
|
+ lastid = util.ObjToString((*resId)[0]["_id"])
|
|
|
+ } else {
|
|
|
+ log.Println("未查找到数据...", fmt.Sprintf(MaxId, _id))
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ idrange := fmt.Sprintf(IDRange, _id, lastid)
|
|
|
+ sess := tools.MQFW.GetMgoConn()
|
|
|
+ defer tools.MQFW.DestoryMongoConn(sess)
|
|
|
+ cur := sess.DB(tools.MQFW.DbName).C(FOLLOW_COLLECTION).Find(&map[string]interface{}{
|
|
|
+ "s_openid": map[string]interface{}{
|
|
|
+ "$exists": true,
|
|
|
+ },
|
|
|
+ "i_ispush": map[string]interface{}{
|
|
|
+ "$ne": 0,
|
|
|
+ },
|
|
|
+ }).Select(map[string]interface{}{
|
|
|
+ "s_entname": 1,
|
|
|
+ "s_userid": 1,
|
|
|
+ "s_openid": 1,
|
|
|
+ "_id": 1,
|
|
|
+ }).Iter()
|
|
|
+ j := 0
|
|
|
+ for tmp := make(map[string]interface{}); cur.Next(tmp); j++ {
|
|
|
+ util.Try(func() {
|
|
|
+ entname := util.ObjToString(tmp["s_entname"])
|
|
|
+ openid := util.ObjToString(tmp["s_openid"])
|
|
|
+ go FindData(tmp["_id"], util.ObjToString(tmp["s_title"]), entname, openid, idrange, true, true)
|
|
|
+ }, func(e interface{}) {
|
|
|
+ log.Println(e)
|
|
|
+ })
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ log.Println("push-over,user-count:", j)
|
|
|
+ Sysconfig["lastid"] = lastid
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+var findpool = make(chan bool, 10)
|
|
|
+var Pushlock = sync.Mutex{}
|
|
|
+
|
|
|
+//不保存不推送-只保存不推送-保存推送
|
|
|
+func FindData(fid interface{}, title, sname, openid, idrange string, bsave, bpush bool) *Arr {
|
|
|
+ findpool <- true
|
|
|
+ defer func() {
|
|
|
+ <-findpool
|
|
|
+ }()
|
|
|
+ q1 := []string{}
|
|
|
+ if sname != "" {
|
|
|
+ q1 = append(q1, fmt.Sprintf(TERM, "winner", elastic.ReplaceYH(sname)))
|
|
|
+ //q1 = append(q1, fmt.Sprintf(TERM, "subtype", "中标"))
|
|
|
+ }
|
|
|
+ var pushArray = &Arr{}
|
|
|
+ if len(q1) > 0 {
|
|
|
+ res := elastic.Get(DB, DB, fmt.Sprintf(Query, idrange, strings.Join(q1, ",")))
|
|
|
+ if res != nil && *res != nil && len(*res) > 0 {
|
|
|
+ //顺序处理,后序会有性能瓶颈,filterdata
|
|
|
+ util.Try(func() {
|
|
|
+ if !bsave {
|
|
|
+ for _, info := range *res {
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ sid := util.BsonIdToSId(info["_id"])
|
|
|
+ if title != sid { //title在此处传的是关注信息id
|
|
|
+ tmp["s_id"] = sid
|
|
|
+ tmp["s_eid"] = util.EncodeArticleId2ByCheck(sid)
|
|
|
+ tmp["s_title"] = info["title"]
|
|
|
+ tmp["l_publishtime"] = info["publishtime"]
|
|
|
+ tmp["s_province"] = info["area"]
|
|
|
+ tmp["s_type"] = util.ObjToString(info["type"])
|
|
|
+ tmp["s_toptype"] = util.ObjToString(info["toptype"])
|
|
|
+ tmp["s_subtype"] = util.ObjToString(info["subtype"])
|
|
|
+ tmp["s_entname"] = util.ObjToString(info["winner"])
|
|
|
+ tmp["s_url"] = util.ObjToString(info["href"])
|
|
|
+ *pushArray = append(*pushArray, &tmp)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Pushlock.Lock()
|
|
|
+ defer Pushlock.Unlock()
|
|
|
+ filterdata.FilterData.Start(openid)
|
|
|
+ defer filterdata.FilterData.End(0)
|
|
|
+ //1.组织信息、
|
|
|
+ //a_relationinfo s_id s_title s_projectname s_projectcode l_publishtime s_url
|
|
|
+ var ids []string
|
|
|
+ for _, info := range *res {
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ sid := util.BsonIdToSId(info["_id"])
|
|
|
+ tmp["s_id"] = sid
|
|
|
+ tmp["s_eid"] = util.EncodeArticleId2ByCheck(sid)
|
|
|
+ tmp["s_title"] = info["title"]
|
|
|
+ tmp["l_publishtime"] = info["publishtime"]
|
|
|
+ tmp["s_province"] = info["area"]
|
|
|
+ tmp["s_type"] = util.ObjToString(info["type"])
|
|
|
+ tmp["s_toptype"] = util.ObjToString(info["toptype"])
|
|
|
+ tmp["s_subtype"] = util.ObjToString(info["subtype"])
|
|
|
+ tmp["s_entname"] = util.ObjToString(info["winner"])
|
|
|
+ tmp["s_url"] = util.ObjToString(info["href"])
|
|
|
+ if filterdata.FilterData.IsExists(sid) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ ids = append(ids, sid)
|
|
|
+ *pushArray = append(*pushArray, &tmp)
|
|
|
+ }
|
|
|
+ go func() {
|
|
|
+ //2.推送、
|
|
|
+ if pushArray.Len() > 0 {
|
|
|
+ sort.Sort(pushArray)
|
|
|
+ //更新用户关注的a_relationinfo、保存到推送记录表、推送给用户
|
|
|
+ //log.Println(fid, ids)
|
|
|
+ if fid != nil && tools.MQFW.Update(FOLLOW_COLLECTION, &bson.M{
|
|
|
+ "_id": fid,
|
|
|
+ }, &bson.M{
|
|
|
+ "$set": bson.M{
|
|
|
+ "l_lastpushtime": (*((*pushArray)[0]))["l_publishtime"],
|
|
|
+ "a_lastpushids": ids,
|
|
|
+ },
|
|
|
+ /*
|
|
|
+ "$pushAll": bson.M{
|
|
|
+ "a_relationinfo": pushArray,
|
|
|
+ },
|
|
|
+ */
|
|
|
+ }, false, false) && bpush {
|
|
|
+ //进入推送逻辑
|
|
|
+ tit := sname
|
|
|
+ if tit == "" {
|
|
|
+ tit = title
|
|
|
+ }
|
|
|
+ if tit != "" {
|
|
|
+ //go func() {
|
|
|
+ followid := util.BsonIdToSId(fid)
|
|
|
+ infoid := tools.MQFW.Save(FOLLOW_PUSH_LOG, &bson.M{
|
|
|
+ "s_openid": openid,
|
|
|
+ "a_relationinfo": pushArray,
|
|
|
+ "l_date": time.Now().Unix(),
|
|
|
+ "s_title": title,
|
|
|
+ "s_entname": sname,
|
|
|
+ "s_followid": followid,
|
|
|
+ })
|
|
|
+ if infoid != "" {
|
|
|
+ lastTime := util.Int64All((*(*pushArray)[0])["l_publishtime"])
|
|
|
+ pushtt := fmt.Sprintf(WxTitle, tit)
|
|
|
+ Tip1 := ""
|
|
|
+ minute := time.Now().Unix() - lastTime
|
|
|
+ if minute > -1 && minute < 61 {
|
|
|
+ Tip1 = fmt.Sprintf("%d秒前发布的:\n", minute)
|
|
|
+ } else {
|
|
|
+ minute = minute / 60
|
|
|
+ if minute < 121 {
|
|
|
+ if minute < 1 {
|
|
|
+ minute = 1
|
|
|
+ }
|
|
|
+ Tip1 = fmt.Sprintf("%d分钟前发布的:\n", minute)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LastTip := ""
|
|
|
+ pushnum := len(*pushArray)
|
|
|
+ if pushnum > 1 {
|
|
|
+ LastTip = fmt.Sprintf("...(共%d条)", pushnum)
|
|
|
+ }
|
|
|
+ LastLen = LastLen - len([]rune(pushtt)) - len([]rune(Tip1))
|
|
|
+ Remark := ""
|
|
|
+ bshow := false
|
|
|
+ for n := 1; n < pushnum+1; n++ {
|
|
|
+ Remark += fmt.Sprintf("%d %s\n", n, (*(*pushArray)[n-1])["s_title"])
|
|
|
+ if len([]rune(Remark)) > LastLen {
|
|
|
+ if n == pushnum {
|
|
|
+ bshow = true
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if bshow {
|
|
|
+ LastTip = ""
|
|
|
+ }
|
|
|
+ go log.Println("push", openid, pushnum, tit)
|
|
|
+ weixinrpc.SendWinXin(&qrpc.NotifyMsg{
|
|
|
+ Openid: openid,
|
|
|
+ Title: fmt.Sprintf(WxTitle, tit),
|
|
|
+ Remark: Tip1 + Remark + LastTip,
|
|
|
+ Detail: WxContent,
|
|
|
+ Service: WxGroup,
|
|
|
+ Url: ViewDomain + "/front/sess/" + se.EncodeString(openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",wxpushfollowlist") + "__" + infoid + "__" + followid,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ //}()
|
|
|
+ }
|
|
|
+ //推送结束
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ }, func(e interface{}) {
|
|
|
+ log.Println("给用户推送关注信息时出错:", e)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return pushArray
|
|
|
+}
|