|
@@ -4,29 +4,32 @@ import (
|
|
|
"container/list"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
"log"
|
|
|
"qfw/push"
|
|
|
+ "qfw/push/dfa"
|
|
|
"qfw/util"
|
|
|
- "qfw/util/elastic"
|
|
|
+ elastic "qfw/util/elastic_v5"
|
|
|
"qfw/util/mongodb"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
-
|
|
|
- "gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
//industry
|
|
|
- ShowField = `"_id","title","publishtime","toptype","subtype","type","area","href","areaval","infoformat",` +
|
|
|
+ ShowField = `"_id","title","projectscope","publishtime","toptype","subtype","type","area","href","areaval","infoformat",` +
|
|
|
`"projectname","buyer","winner","buyer","budget","bidamount","bidopentime","subscopeclass"`
|
|
|
FindField = `"title"`
|
|
|
SmartFindField = `"title","projectscope"`
|
|
|
SortQuery = `{"publishtime":"desc"}`
|
|
|
DB = "bidding"
|
|
|
IDRange = `{"range":{"id":{"gt":"%s","lte":"%s"}}},{"range":{"publishtime":{"gt": %d}}}`
|
|
|
+ TimeRange = `{"range":{"comeintime":{"gte":%d,"lte":%d}}}`
|
|
|
MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
PushTitle = `[<span class='area'>%s</span>]%s`
|
|
|
Infoformat = `{"term":{"infoformat":%d}}`
|
|
|
+ FilterQuery = `{"query": {"filtered": {"filter": {"bool": {"must": [%s]}}}}}`
|
|
|
)
|
|
|
|
|
|
var (
|
|
@@ -34,93 +37,51 @@ var (
|
|
|
1: "招标",
|
|
|
2: "拟建项目",
|
|
|
}
|
|
|
- pushpool = make(chan bool, 8)
|
|
|
+ searchpool = make(chan bool, 8)
|
|
|
+ pushpool = make(chan bool, 50)
|
|
|
+ eachpool = make(chan bool, 100)
|
|
|
+ searchWaitGroup = &sync.WaitGroup{}
|
|
|
+ eachInfoWaitGroup = &sync.WaitGroup{}
|
|
|
+ userMapLock = &sync.Mutex{}
|
|
|
+ pushLock = &sync.Mutex{}
|
|
|
+ biddingDatas *[]map[string]interface{}
|
|
|
+ allUsers *map[string]*push.MemberInterest
|
|
|
+ MaxSearch = 10000 //缓存中总共加载这么多条
|
|
|
+ OnceMax = 400 //ES一次查询这么多条
|
|
|
)
|
|
|
|
|
|
-//开始推送
|
|
|
-func DoPush(k *push.MemberInterest, idrange []string, MaxPushSize int, Now time.Time, ratemode int) {
|
|
|
- defer util.Catch()
|
|
|
- defer func() {
|
|
|
- <-pushpool
|
|
|
- }()
|
|
|
- //是否含有拟建项目
|
|
|
- //for i := 1; i < 3; i++ {
|
|
|
- //idrangeTmp := append(idrange, fmt.Sprintf(Infoformat, i))
|
|
|
- var pushArray = []map[string]interface{}{}
|
|
|
- f := FindField
|
|
|
- if k.Smartset == 1 {
|
|
|
- f = SmartFindField
|
|
|
- }
|
|
|
- res := elastic.GetResForJY(DB, DB, k.AllKeys, strings.Join(idrange, ","), f, SortQuery, ShowField, 0, MaxPushSize)
|
|
|
- if res != nil && *res != nil && len(*res) > 0 {
|
|
|
- listInfos := list.New()
|
|
|
- for _, v := range *res {
|
|
|
- vh := v["highlight"].(map[string][]string)
|
|
|
- if len(vh) == 2 {
|
|
|
- v["highlight"] = 3
|
|
|
- } else {
|
|
|
- if vh["title"] != nil {
|
|
|
- v["highlight"] = 1
|
|
|
- } else {
|
|
|
- v["highlight"] = 2
|
|
|
- }
|
|
|
- }
|
|
|
- province := util.ObjToString(v["area"])
|
|
|
- v["otitle"] = v["title"]
|
|
|
- if "A" != province {
|
|
|
- v["title"] = fmt.Sprintf(PushTitle, province, v["title"])
|
|
|
- }
|
|
|
- var info = v
|
|
|
- pushArray = append(pushArray, info)
|
|
|
- listInfos.PushBack(&info)
|
|
|
- }
|
|
|
- var rtflog = false
|
|
|
- h := time.Now().Hour()
|
|
|
- openid := k.Openid
|
|
|
- if k.Ratemode == 3 {
|
|
|
- if k.Rmstart <= h && h < k.Rmend {
|
|
|
- rtflog = true
|
|
|
- //把暂存数据取出合并后发送,并删除此用户暂存库中的数据
|
|
|
- inofsdata := mongodb.FindOne("tempush", &bson.M{"s_openid": openid})
|
|
|
- if inofsdata != nil {
|
|
|
- if (*inofsdata)["tmpinfos"] != nil {
|
|
|
- infos := (*inofsdata)["tmpinfos"].([]interface{})
|
|
|
- for _, tif := range infos {
|
|
|
- tifMap := tif.(map[string]interface{})
|
|
|
- listInfos.PushBack(&tifMap)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- mongodb.Del("tempush", &bson.M{"s_openid": openid})
|
|
|
- } else {
|
|
|
- //自定义时间外的数据暂存数据库tempush中
|
|
|
- mongodb.Update("tempush", &bson.M{"s_openid": openid}, &bson.M{
|
|
|
- "$pushAll": bson.M{
|
|
|
- "tmpinfos": &pushArray,
|
|
|
- },
|
|
|
- }, true, false)
|
|
|
- }
|
|
|
- } else {
|
|
|
- rtflog = true
|
|
|
- }
|
|
|
- if rtflog {
|
|
|
- DealSend(k, listInfos, Now, MaxPushSize, k.Ratemode, InfoMap[1])
|
|
|
- }
|
|
|
+type Pjob struct {
|
|
|
+ InterestDfa *dfa.DFA
|
|
|
+ NotInterestDfa *dfa.DFA
|
|
|
+ Key_user *map[string]*[]*push.MemberInterest
|
|
|
+ Notkey_user *map[string]*[]*push.MemberInterest
|
|
|
+}
|
|
|
|
|
|
- //}
|
|
|
+//所有用户的关键词和排除词
|
|
|
+func (p *Pjob) CreateDaf() {
|
|
|
+ //关键词
|
|
|
+ p.InterestDfa = &dfa.DFA{}
|
|
|
+ interestWords := make([]string, 0)
|
|
|
+ for k, _ := range *p.Key_user {
|
|
|
+ interestWords = append(interestWords, k)
|
|
|
+ }
|
|
|
+ p.InterestDfa.AddWord(interestWords...)
|
|
|
+ //排除关键词
|
|
|
+ p.NotInterestDfa = &dfa.DFA{}
|
|
|
+ notInterestWords := make([]string, 0)
|
|
|
+ for k, _ := range *p.Notkey_user {
|
|
|
+ notInterestWords = append(notInterestWords, k)
|
|
|
}
|
|
|
+ p.NotInterestDfa.AddWord(notInterestWords...)
|
|
|
}
|
|
|
|
|
|
-func PushInfoByEs(MaxPushSize int, Config *map[string]interface{}, i_ratemode int) (res bool) {
|
|
|
+func PushInfoByEs(MaxPushSize int, Config *map[string]interface{}, i_ratemode int) bool {
|
|
|
defer util.Catch()
|
|
|
st, _ := time.ParseInLocation(util.Date_Full_Layout, (*Config)["StartTime"].(string), time.Local)
|
|
|
lastTime := st.Unix()
|
|
|
_id := util.ObjToString((*Config)["lastid"])
|
|
|
log.Println("开始执行任务-i_ratemode-id-lasttime:", i_ratemode, _id, lastTime)
|
|
|
Now := time.Now()
|
|
|
- session := mongodb.GetMgoConn()
|
|
|
- defer mongodb.DestoryMongoConn(session)
|
|
|
- idrange := []string{}
|
|
|
//获取本次查询的最大id
|
|
|
resId := elastic.Get(DB, DB, fmt.Sprintf(MaxId, _id))
|
|
|
lastid := ""
|
|
@@ -129,45 +90,147 @@ func PushInfoByEs(MaxPushSize int, Config *map[string]interface{}, i_ratemode in
|
|
|
lastid = util.ObjToString((*resId)[0]["_id"])
|
|
|
comeintime = (*resId)[0]["comeintime"]
|
|
|
} else {
|
|
|
- log.Println("未查找到数据...", fmt.Sprintf(MaxId, _id))
|
|
|
+ log.Println("获取本次查询的最大id的时候,未查找到数据...", fmt.Sprintf(MaxId, _id))
|
|
|
return false
|
|
|
}
|
|
|
- //filter全局查询
|
|
|
- idrange = append(idrange, fmt.Sprintf(IDRange, _id, lastid, lastTime-7*86400))
|
|
|
+ a_p, s_p := InitUserCache(i_ratemode)
|
|
|
+ if a_p == nil {
|
|
|
+ log.Println("未查找到需要推送的用户...")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if !InitBiddingCache(_id, lastid, lastTime, 0, 0, false) {
|
|
|
+ log.Println("加载数据到内存中的时候,未查找到数据...")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ //
|
|
|
+ DoPush(a_p, s_p, MaxPushSize, Now, true)
|
|
|
+ log.Println("推送结束-i_ratemode-comeintime-lastid", i_ratemode, comeintime, lastid)
|
|
|
+ biddingDatas = nil
|
|
|
+ allUsers = nil
|
|
|
+ (*Config)["StartTime"] = util.FormatDateWithObj(&comeintime, util.Date_Full_Layout)
|
|
|
+ (*Config)["lastid"] = lastid
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+//九点推送的用户
|
|
|
+func PushNineUsers(MaxPushSize int, i_ratemode int) bool {
|
|
|
+ pushLock.Lock()
|
|
|
+ defer pushLock.Unlock()
|
|
|
+ log.Println("九点推送,开始执行任务。。。")
|
|
|
+ Now := time.Now()
|
|
|
+ session := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(session)
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "ratemode": 2,
|
|
|
+ }
|
|
|
+ query := session.DB("qfw").C("tempush").Find(&q).Select(&map[string]interface{}{
|
|
|
+ "_id": -1,
|
|
|
+ "s_openid": 1,
|
|
|
+ "tmpinfos": 1,
|
|
|
+ "info": 1,
|
|
|
+ }).Iter()
|
|
|
+ userMap := map[*push.MemberInterest]*list.List{}
|
|
|
+ //遍历用户数组
|
|
|
+ count := 0
|
|
|
+ for tmp := make(map[string]interface{}); query.Next(tmp); {
|
|
|
+ if tmp["tmpinfos"] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ log.Println("九点推送,openid:", tmp["s_openid"])
|
|
|
+ tmpinfos, _ := tmp["tmpinfos"].([]interface{})
|
|
|
+ if len(tmpinfos) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ info, _ := tmp["info"].(map[string]interface{})
|
|
|
+ if len(info) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //
|
|
|
+ user := &push.MemberInterest{
|
|
|
+ Id: util.ObjToString(info["id"]),
|
|
|
+ PushMode: util.IntAllDef(info["pushMode"], 1),
|
|
|
+ Email: util.ObjToString(info["email"]),
|
|
|
+ Openid: util.ObjToString(info["openid"]),
|
|
|
+ Ratemode: util.IntAllDef(info["ratemode"], 1),
|
|
|
+ Rmstart: util.IntAllDef(info["rmstart"], 1),
|
|
|
+ Rmend: util.IntAllDef(info["rmend"], 1),
|
|
|
+ Smartset: util.IntAllDef(info["smartset"], 0),
|
|
|
+ Dataexport: util.IntAllDef(info["dataexport"], 0),
|
|
|
+ }
|
|
|
+ keys, _ := info["keys"].([]interface{})
|
|
|
+ user.Interest = util.ObjArrToStringArr(keys)
|
|
|
+ //
|
|
|
+ listInfos := list.New()
|
|
|
+ for _, tif := range tmpinfos {
|
|
|
+ tifMap, _ := tif.(map[string]interface{})
|
|
|
+ listInfos.PushBack(&tifMap)
|
|
|
+ }
|
|
|
+ userMap[user] = listInfos
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ if count == 0 {
|
|
|
+ log.Println("九点推送,未查找到需要推送的用户...")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ log.Println("九点推送,本次查询用户总数:", count)
|
|
|
+ for temp_k, temp_list := range userMap {
|
|
|
+ pushpool <- true
|
|
|
+ go func(k *push.MemberInterest, listInfos *list.List) {
|
|
|
+ defer func() {
|
|
|
+ <-pushpool
|
|
|
+ }()
|
|
|
+ mongodb.Del("tempush", &bson.M{"s_openid": k.Openid})
|
|
|
+ DealSend(k, listInfos, Now, MaxPushSize, k.Ratemode, InfoMap[1])
|
|
|
+ }(temp_k, temp_list)
|
|
|
+ }
|
|
|
+ log.Println("九点推送,推送结束。。。")
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+//初始化用户缓存
|
|
|
+func InitUserCache(i_ratemode int) (*Pjob, *Pjob) {
|
|
|
+ defer util.Catch()
|
|
|
//遍历用户
|
|
|
q := map[string]interface{}{
|
|
|
"i_appid": 2,
|
|
|
"i_ispush": map[string]interface{}{"$ne": 0},
|
|
|
}
|
|
|
- if i_ratemode == 1 {
|
|
|
- q["o_jy.i_ratemode"] = map[string]interface{}{"$in": []int{1, 3}}
|
|
|
- } else {
|
|
|
+ if i_ratemode == 2 {
|
|
|
q["o_jy.i_ratemode"] = i_ratemode
|
|
|
}
|
|
|
- // smopenid := util.ObjToString((*Config)["fixPush"])
|
|
|
- // if smopenid != "" {
|
|
|
- // q["s_m_openid"] = smopenid
|
|
|
+ //18 o8-2pwHj1s_tv3nnRxrH9cD2ngkk
|
|
|
+ //14 ouCYjwzKpn-3orDpb0CA3Po1RLHw
|
|
|
+ // q = map[string]interface{}{
|
|
|
+ // "i_appid": 2,
|
|
|
+ // "s_m_openid": map[string]interface{}{
|
|
|
+ // "$in": []string{"o8-2pwHj1s_tv3nnRxrH9cD2ngkk"},
|
|
|
+ // },
|
|
|
// }
|
|
|
+ session := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(session)
|
|
|
query := session.DB("qfw").C("user").Find(&q).Select(&map[string]interface{}{
|
|
|
- "_id": 1,
|
|
|
- "o_jy": 1,
|
|
|
- "s_m_openid": 1,
|
|
|
- "i_dataexport": 1,
|
|
|
- "i_smartset": 1,
|
|
|
+ "_id": 1,
|
|
|
+ "o_jy": 1,
|
|
|
+ "s_m_openid": 1,
|
|
|
+ "i_dataexport": 1,
|
|
|
+ "i_smartset": 1,
|
|
|
+ "i_supersearch": 1,
|
|
|
}).Iter()
|
|
|
+ allUsers = &map[string]*push.MemberInterest{}
|
|
|
//遍历所有用户放到数组
|
|
|
- allPushUsers := []map[string]interface{}{}
|
|
|
- for tmp := make(map[string]interface{}); query.Next(tmp); {
|
|
|
- allPushUsers = append(allPushUsers, tmp)
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
- log.Println("本次推送", len(allPushUsers), "人")
|
|
|
+ a_key_user := make(map[string]*[]*push.MemberInterest)
|
|
|
+ a_notkey_user := make(map[string]*[]*push.MemberInterest)
|
|
|
+ //开启智能订阅的用户
|
|
|
+ s_key_user := make(map[string]*[]*push.MemberInterest)
|
|
|
+ s_notkey_user := make(map[string]*[]*push.MemberInterest)
|
|
|
//遍历用户数组
|
|
|
- for _, tmp := range allPushUsers {
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
|
|
|
log.Println("openid:", tmp["s_m_openid"])
|
|
|
util.Try(func() {
|
|
|
_id := fmt.Sprintf("%x", string(tmp["_id"].(bson.ObjectId)))
|
|
|
- o_msgset := tmp["o_jy"].(map[string]interface{})
|
|
|
+ o_msgset, _ := tmp["o_jy"].(map[string]interface{})
|
|
|
var allkeysTemp []elastic.KeyConfig
|
|
|
_bs, err := json.Marshal(o_msgset["a_key"])
|
|
|
if err == nil {
|
|
@@ -195,34 +258,392 @@ func PushInfoByEs(MaxPushSize int, Config *map[string]interface{}, i_ratemode in
|
|
|
}
|
|
|
////////////////
|
|
|
if len(allkeys) > 0 {
|
|
|
- a_key := []string{}
|
|
|
+ keys := []string{} //关键词
|
|
|
+ notkeys := []string{} //排除词
|
|
|
+ key_notkey := map[string]string{} //关键词所对应的排除词
|
|
|
+ key_area := map[string]map[string]bool{} //关键词所对应的信息范围
|
|
|
+ key_infotype := map[string]map[string]bool{} //关键词所对应的信息类型
|
|
|
for _, vs := range allkeys {
|
|
|
- a_key = append(a_key, strings.Join(vs.Keys, "+"))
|
|
|
+ key := strings.Join(vs.Keys, "+")
|
|
|
+ keys = append(keys, key)
|
|
|
+ notkey := strings.Join(vs.NotKeys, "+")
|
|
|
+ notkeys = append(notkeys, notkey)
|
|
|
+ //建立与排除词的对应关系
|
|
|
+ key_notkey[key] = notkey
|
|
|
+ //建立与信息范围的对应关系
|
|
|
+ for _, area := range vs.Areas {
|
|
|
+ if key_area[key] == nil {
|
|
|
+ key_area[key] = map[string]bool{}
|
|
|
+ }
|
|
|
+ key_area[key][area] = true
|
|
|
+ }
|
|
|
+ //建立与信息类型的对应关系
|
|
|
+ for _, infotype := range vs.InfoTypes {
|
|
|
+ if key_infotype[key] == nil {
|
|
|
+ key_infotype[key] = map[string]bool{}
|
|
|
+ }
|
|
|
+ key_infotype[key][infotype] = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ smartset := util.IntAllDef(tmp["i_smartset"], 0)
|
|
|
+ user := &push.MemberInterest{
|
|
|
+ Id: _id,
|
|
|
+ Interest: keys,
|
|
|
+ NotInterest: notkeys,
|
|
|
+ Key_notkey: key_notkey,
|
|
|
+ Key_area: key_area,
|
|
|
+ Key_infotype: key_infotype,
|
|
|
+ PushMode: util.IntAllDef(o_msgset["i_mode"], 1),
|
|
|
+ Email: util.ObjToString(o_msgset["s_email"]),
|
|
|
+ Openid: util.ObjToString(tmp["s_m_openid"]),
|
|
|
+ Ratemode: util.IntAllDef(o_msgset["i_ratemode"], 1),
|
|
|
+ Rmstart: util.IntAllDef(o_msgset["i_rmstart"], 1),
|
|
|
+ Rmend: util.IntAllDef(o_msgset["i_rmend"], 1),
|
|
|
+ AllKeys: allkeys,
|
|
|
+ Smartset: smartset,
|
|
|
+ Dataexport: util.IntAllDef(tmp["i_dataexport"], 0),
|
|
|
}
|
|
|
- user := push.MemberInterest{
|
|
|
- Id: _id,
|
|
|
- Interest: a_key,
|
|
|
- PushMode: util.IntAllDef(o_msgset["i_mode"], 1),
|
|
|
- Email: util.ObjToString(o_msgset["s_email"]),
|
|
|
- Openid: util.ObjToString(tmp["s_m_openid"]),
|
|
|
- Ratemode: util.IntAllDef(o_msgset["i_ratemode"], 1),
|
|
|
- Rmstart: util.IntAllDef(o_msgset["i_rmstart"], 1),
|
|
|
- Rmend: util.IntAllDef(o_msgset["i_rmend"], 1),
|
|
|
- AllKeys: allkeys,
|
|
|
- Smartset: util.IntAllDef(tmp["i_smartset"], 0),
|
|
|
- Dataexport: util.IntAllDef(tmp["i_dataexport"], 0),
|
|
|
+ (*allUsers)[user.Openid] = user
|
|
|
+ MakeKeyUser(keys, user, &a_key_user)
|
|
|
+ MakeKeyUser(notkeys, user, &a_notkey_user)
|
|
|
+ if smartset == 1 {
|
|
|
+ MakeKeyUser(keys, user, &s_key_user)
|
|
|
+ MakeKeyUser(notkeys, user, &s_notkey_user)
|
|
|
}
|
|
|
- pushpool <- true
|
|
|
- go DoPush(&user, idrange, MaxPushSize, Now, i_ratemode)
|
|
|
}
|
|
|
-
|
|
|
}, func(e interface{}) {
|
|
|
log.Println(e)
|
|
|
})
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- log.Println("推送结束..", comeintime, lastid)
|
|
|
- allPushUsers = []map[string]interface{}{}
|
|
|
- (*Config)["StartTime"] = util.FormatDateWithObj(&comeintime, util.Date_Full_Layout)
|
|
|
- (*Config)["lastid"] = lastid
|
|
|
+ log.Println("本次查询用户总数:", n)
|
|
|
+ if len(*allUsers) == 0 {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ a_p := &Pjob{
|
|
|
+ Key_user: &a_key_user,
|
|
|
+ Notkey_user: &a_notkey_user,
|
|
|
+ }
|
|
|
+ a_p.CreateDaf()
|
|
|
+ //
|
|
|
+ s_p := &Pjob{
|
|
|
+ Key_user: &s_key_user,
|
|
|
+ Notkey_user: &s_notkey_user,
|
|
|
+ }
|
|
|
+ s_p.CreateDaf()
|
|
|
+ return a_p, s_p
|
|
|
+}
|
|
|
+
|
|
|
+//把用户挂在词下面
|
|
|
+func MakeKeyUser(keys []string, user *push.MemberInterest, key_user *map[string]*[]*push.MemberInterest) {
|
|
|
+ m := map[string]bool{}
|
|
|
+ for _, v := range keys {
|
|
|
+ if v == "" || m[v] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ m[v] = true
|
|
|
+ var arr *[]*push.MemberInterest
|
|
|
+ if nil == (*key_user)[v] {
|
|
|
+ arr = &[]*push.MemberInterest{}
|
|
|
+ (*key_user)[v] = arr
|
|
|
+ } else {
|
|
|
+ arr = (*key_user)[v]
|
|
|
+ (*key_user)[v] = arr
|
|
|
+ }
|
|
|
+ *arr = append(*arr, user)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//加载数据到内存中
|
|
|
+func InitBiddingCache(_id, lastid string, lastTime int64, startTime, endTime int64, isRepair bool) bool {
|
|
|
+ //filter全局查询
|
|
|
+ c_query := ""
|
|
|
+ //第一次发布程序之后,修补九点推送的用户数据
|
|
|
+ if isRepair {
|
|
|
+ c_query = fmt.Sprintf(FilterQuery, fmt.Sprintf(TimeRange, startTime, endTime))
|
|
|
+ } else {
|
|
|
+ c_query = fmt.Sprintf(FilterQuery, fmt.Sprintf(IDRange, _id, lastid, lastTime-7*86400))
|
|
|
+ }
|
|
|
+ log.Println("es query:", c_query)
|
|
|
+ //testquery := `{"terms":{"_id":["59cefaa12cf26913ca07fd56"]}}`
|
|
|
+ //testquery := ``
|
|
|
+ //c_query = fmt.Sprintf(FilterQuery, testquery)
|
|
|
+ count := int(elastic.Count(DB, DB, c_query))
|
|
|
+ log.Println("本次推送共查到数据", count, "条")
|
|
|
+ if count == 0 {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if count > MaxSearch && !isRepair {
|
|
|
+ count = MaxSearch
|
|
|
+ log.Println("目前数据多于", MaxSearch, ",只加载了", MaxSearch, "条!")
|
|
|
+ }
|
|
|
+ if OnceMax > count {
|
|
|
+ OnceMax = count
|
|
|
+ }
|
|
|
+ var res []map[string]interface{}
|
|
|
+ totalPage := int((count + OnceMax - 1) / OnceMax)
|
|
|
+ log.Println("数据一共", totalPage, "页!")
|
|
|
+ //如果res长度和cout相差5条,重试
|
|
|
+ for t := 1; t <= 3; t++ {
|
|
|
+ res = []map[string]interface{}{}
|
|
|
+ for i := 0; i < totalPage; i++ {
|
|
|
+ searchpool <- true
|
|
|
+ searchWaitGroup.Add(1)
|
|
|
+ go func(start int) {
|
|
|
+ defer func() {
|
|
|
+ searchWaitGroup.Done()
|
|
|
+ <-searchpool
|
|
|
+ }()
|
|
|
+ r := elastic.GetAllByNgram(DB, DB, c_query, "", SortQuery, ShowField, start*OnceMax, OnceMax, 0, false)
|
|
|
+ res = append(res, *r...)
|
|
|
+ log.Println("第", start+1, "页数据加载完成!")
|
|
|
+ }(i)
|
|
|
+ }
|
|
|
+ searchWaitGroup.Wait()
|
|
|
+ if len(res) >= count-5 {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ log.Println("第", t, "次加载数据完成,数据总数", len(res), ",由于数据量不够,重新加载!")
|
|
|
+ }
|
|
|
+ resLenght := len(res)
|
|
|
+ if resLenght == 0 {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ biddingDatas = &res
|
|
|
+ log.Println(resLenght, "条数据已经加载完成!")
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+//开始推送
|
|
|
+func DoPush(a_p *Pjob, s_p *Pjob, MaxPushSize int, Now time.Time, isPush bool) {
|
|
|
+ defer util.Catch()
|
|
|
+ pushLock.Lock()
|
|
|
+ defer pushLock.Unlock()
|
|
|
+ userMap := EachAllBidInfo(a_p, s_p)
|
|
|
+ for temp_k, temp_list := range *userMap {
|
|
|
+ pushpool <- true
|
|
|
+ go func(k *push.MemberInterest, listInfos *list.List) {
|
|
|
+ defer func() {
|
|
|
+ <-pushpool
|
|
|
+ }()
|
|
|
+ var rtflog = false
|
|
|
+ h := time.Now().Hour()
|
|
|
+ openid := k.Openid
|
|
|
+ if k.Ratemode == 2 || k.Ratemode == 3 {
|
|
|
+ inofsdata := mongodb.FindOne("tempush", &bson.M{"s_openid": openid})
|
|
|
+ if k.Ratemode == 3 && k.Rmstart <= h && h < k.Rmend {
|
|
|
+ rtflog = true
|
|
|
+ //把暂存数据取出合并后发送,并删除此用户暂存库中的数据
|
|
|
+ if inofsdata != nil {
|
|
|
+ if (*inofsdata)["tmpinfos"] != nil {
|
|
|
+ infos, _ := (*inofsdata)["tmpinfos"].([]interface{})
|
|
|
+ for _, tif := range infos {
|
|
|
+ tifMap, _ := tif.(map[string]interface{})
|
|
|
+ listInfos.PushBack(&tifMap)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mongodb.Del("tempush", &bson.M{"s_openid": openid})
|
|
|
+ } else {
|
|
|
+ //自定义时间外的数据暂存数据库tempush中
|
|
|
+ myInfo := &bson.M{
|
|
|
+ "id": k.Id,
|
|
|
+ "keys": k.Interest,
|
|
|
+ "pushMode": k.PushMode,
|
|
|
+ "email": k.Email,
|
|
|
+ "openid": k.Openid,
|
|
|
+ "ratemode": k.Ratemode,
|
|
|
+ "rmstart": k.Rmstart,
|
|
|
+ "rmend": k.Rmend,
|
|
|
+ "allKeys": k.AllKeys,
|
|
|
+ "smartset": k.Smartset,
|
|
|
+ "dataexport": k.Dataexport,
|
|
|
+ }
|
|
|
+ var pushArray = []interface{}{}
|
|
|
+ for e := listInfos.Front(); e != nil; e = e.Next() {
|
|
|
+ k2 := *(e.Value.(*map[string]interface{}))
|
|
|
+ pushArray = append(pushArray, k2)
|
|
|
+ if len(pushArray) >= MaxPushSize {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pLength := len(pushArray)
|
|
|
+ upSet := new(bson.M)
|
|
|
+ //直接替换
|
|
|
+ if inofsdata == nil || pLength >= MaxPushSize {
|
|
|
+ upSet = &bson.M{"$set": &bson.M{"tmpinfos": &pushArray, "ratemode": k.Ratemode, "info": myInfo}}
|
|
|
+ } else { //保留最新的50条
|
|
|
+ infos, _ := (*inofsdata)["tmpinfos"].([]interface{})
|
|
|
+ rLength := len(infos)
|
|
|
+ if rLength > 0 && rLength+pLength > MaxPushSize {
|
|
|
+ start := rLength + pLength - MaxPushSize
|
|
|
+ var relationinfoTemp []interface{}
|
|
|
+ if start < rLength {
|
|
|
+ relationinfoTemp = append(relationinfoTemp, infos[start:]...)
|
|
|
+ }
|
|
|
+ relationinfoTemp = append(relationinfoTemp, pushArray...)
|
|
|
+ upSet = &bson.M{"$set": &bson.M{"tmpinfos": &relationinfoTemp, "ratemode": k.Ratemode, "info": myInfo}}
|
|
|
+ } else { //追加
|
|
|
+ upSet = &bson.M{"$set": &bson.M{"ratemode": k.Ratemode, "info": myInfo}, "$pushAll": &bson.M{"tmpinfos": &pushArray}}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mongodb.Update("tempush", &bson.M{"s_openid": openid}, upSet, true, false)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ rtflog = true
|
|
|
+ }
|
|
|
+ if rtflog && isPush {
|
|
|
+ DealSend(k, listInfos, Now, MaxPushSize, k.Ratemode, InfoMap[1])
|
|
|
+ }
|
|
|
+ }(temp_k, temp_list)
|
|
|
+
|
|
|
+ // for temp_k, temp_list := range *userMap {
|
|
|
+ // arrayids := []interface{}{}
|
|
|
+ // for e := temp_list.Front(); e != nil; e = e.Next() {
|
|
|
+ // k2 := *(e.Value.(*map[string]interface{}))
|
|
|
+ // arrayids = append(arrayids, k2["_id"])
|
|
|
+ // }
|
|
|
+ // log.Println(temp_k.Openid, arrayids)
|
|
|
+ // if len(arrayids) < 5 {
|
|
|
+ // log.Println("==========", temp_k.Openid, arrayids)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//遍历数据并执行推送操作
|
|
|
+func EachAllBidInfo(a_p *Pjob, s_p *Pjob) *map[*push.MemberInterest]*list.List {
|
|
|
+ defer util.Catch()
|
|
|
+ userMap := &map[*push.MemberInterest]*list.List{}
|
|
|
+ var count int
|
|
|
+ for _, temp := range *biddingDatas {
|
|
|
+ eachpool <- true
|
|
|
+ eachInfoWaitGroup.Add(1)
|
|
|
+ count++
|
|
|
+ go func(info map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ eachInfoWaitGroup.Done()
|
|
|
+ <-eachpool
|
|
|
+ }()
|
|
|
+ title := util.ObjToString(info["title"])
|
|
|
+ if title == "" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ area := util.ObjToString(info["area"])
|
|
|
+ toptype := util.ObjToString(info["toptype"])
|
|
|
+ //订阅词
|
|
|
+ keys := a_p.InterestDfa.Analy(title)
|
|
|
+ //排除词
|
|
|
+ notkeys := a_p.NotInterestDfa.Analy(title)
|
|
|
+ users := GetFinalUser(keys, notkeys, a_p.Key_user, area, toptype, true)
|
|
|
+ //开启智能匹配的用户,匹配projectscope
|
|
|
+ if s_p != nil {
|
|
|
+ projectscope := util.ObjToString(info["projectscope"])
|
|
|
+ if projectscope != "" {
|
|
|
+ keys = s_p.InterestDfa.Analy(projectscope)
|
|
|
+ notkeys = s_p.NotInterestDfa.Analy(projectscope)
|
|
|
+ s_users := GetFinalUser(keys, notkeys, s_p.Key_user, area, toptype, false)
|
|
|
+ for _, s_u := range *s_users {
|
|
|
+ if (*users)[s_u.Openid] != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ (*users)[s_u.Openid] = s_u
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(*users) > 0 {
|
|
|
+ province := util.ObjToString(info["area"])
|
|
|
+ info["otitle"] = info["title"]
|
|
|
+ if "A" != province {
|
|
|
+ info["title"] = fmt.Sprintf(PushTitle, province, info["title"])
|
|
|
+ }
|
|
|
+ EachInfoToUser(users, &info, userMap)
|
|
|
+ }
|
|
|
+ }(temp)
|
|
|
+ if count%500 == 0 {
|
|
|
+ log.Println("当前信息索引:", count)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ eachInfoWaitGroup.Wait()
|
|
|
+ log.Println("数据遍历完成!")
|
|
|
+ return userMap
|
|
|
+}
|
|
|
+
|
|
|
+//获取最终的用户,排除词、信息范围、信息类型之后的
|
|
|
+//返回匹配上的用户和没有匹配到的用户
|
|
|
+func GetFinalUser(keys, notkeys []string, key_user *map[string]*[]*push.MemberInterest, area, toptype string, flag bool) *map[string]*push.MemberInterest {
|
|
|
+ keyMap := map[string]bool{}
|
|
|
+ for _, v := range keys {
|
|
|
+ keyMap[v] = true
|
|
|
+ }
|
|
|
+ notkeyMap := map[string]bool{}
|
|
|
+ for _, v := range notkeys {
|
|
|
+ notkeyMap[v] = true
|
|
|
+ }
|
|
|
+ y_users := map[string]*push.MemberInterest{} //匹配到用户
|
|
|
+ //遍历所有用户
|
|
|
+ for k, us := range *key_user {
|
|
|
+ if !keyMap[k] { //改关键词没有匹配到的用户
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for _, u := range *us {
|
|
|
+ //获取该词下面所有的用户
|
|
|
+ //遍历我的排除词,如果存在的话,排除自己
|
|
|
+ if notkeyMap[u.Key_notkey[k]] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //遍历我的信息范围,看该信息是不是在我的信息范围中
|
|
|
+ if len(u.Key_area[k]) > 0 && !u.Key_area[k][area] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //遍历我的信息类型,看该信息是不是在我的信息类型中
|
|
|
+ if len(u.Key_infotype[k]) > 0 && !u.Key_infotype[k][toptype] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ y_users[u.Openid] = u
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //获取最终没有匹配到的用户,进行正文或者范围匹配
|
|
|
+ users := map[string]*push.MemberInterest{}
|
|
|
+ for k, v := range *allUsers {
|
|
|
+ if y_users[k] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ users[v.Openid] = v
|
|
|
+ }
|
|
|
+ return &users
|
|
|
+}
|
|
|
+
|
|
|
+//遍历用户加入到此条信息上
|
|
|
+func EachInfoToUser(users *map[string]*push.MemberInterest, info *map[string]interface{}, userMap *map[*push.MemberInterest]*list.List) {
|
|
|
+ defer userMapLock.Unlock()
|
|
|
+ userMapLock.Lock()
|
|
|
+ for _, v := range *users {
|
|
|
+ s := (*userMap)[v]
|
|
|
+ if s == nil {
|
|
|
+ s = list.New()
|
|
|
+ (*userMap)[v] = s
|
|
|
+ }
|
|
|
+ s.PushBack(info)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//九点推送补漏
|
|
|
+func RepairNinePush(startTime, endTime int64) bool {
|
|
|
+ log.Println("开始修补数据...")
|
|
|
+ a_p, s_p := InitUserCache(2)
|
|
|
+ if a_p == nil {
|
|
|
+ log.Println("修补数据,未查找到需要的用户...")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if !InitBiddingCache("", "", 0, startTime, endTime, true) {
|
|
|
+ log.Println("修补数据,加载数据到内存中的时候,未查找到数据...")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ //
|
|
|
+ DoPush(a_p, s_p, 50, time.Now(), false)
|
|
|
+ log.Println("修补数据结束...")
|
|
|
+ biddingDatas = nil
|
|
|
+ allUsers = nil
|
|
|
return true
|
|
|
}
|