|
@@ -1,20 +1,12 @@
|
|
|
package job
|
|
|
|
|
|
import (
|
|
|
- "encoding/json"
|
|
|
- "fmt"
|
|
|
- "math"
|
|
|
- "os"
|
|
|
. "public"
|
|
|
. "push/config"
|
|
|
putil "push/util"
|
|
|
"qfw/util"
|
|
|
- "qfw/util/mail"
|
|
|
"qfw/util/mongodb"
|
|
|
- "regexp"
|
|
|
"sort"
|
|
|
- "strconv"
|
|
|
- "strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
@@ -23,74 +15,37 @@ import (
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
-const (
|
|
|
- BulkSize = 200
|
|
|
- DbName = "qfw"
|
|
|
-)
|
|
|
-
|
|
|
var (
|
|
|
- messyCodeEmailReg = regexp.MustCompile(SysConfig.MessyCodeEmailReg)
|
|
|
- MoveFields = []string{"s_m_openid", "a_m_openid", "phone", "usertype", "jpushid", "opushid", "words", "ratemode", "wxpush", "apppush", "mailpush", "smartset", "timestamp", "subscribe", "applystatus", "appphonetype", "email", "size", "modifydate", "mergeorder"}
|
|
|
+ MoveFields = []string{"s_m_openid", "a_m_openid", "phone", "usertype", "jpushid", "opushid", "words", "ratemode", "wxpush", "apppush", "mailpush", "pchelperpush", "smartset", "timestamp", "subscribe", "applystatus", "appphonetype", "email", "size", "modifydate", "mergeorder", "nickname", "firstpushtime"}
|
|
|
)
|
|
|
|
|
|
-func init() {
|
|
|
- mySelf := Jobs.Push.MySelf().(*PushJob)
|
|
|
- //推送1分钟限制
|
|
|
- mySelf.minutePushPool = make(chan bool, SysConfig.MinutePushSize)
|
|
|
- mySelf.fastigiumMinutePushPool = make(chan bool, SysConfig.FastigiumMinutePushSize)
|
|
|
- for i := 0; i < SysConfig.MinutePushSize; i++ {
|
|
|
- mySelf.minutePushPool <- true
|
|
|
- }
|
|
|
- for i := 0; i < SysConfig.FastigiumMinutePushSize; i++ {
|
|
|
- mySelf.fastigiumMinutePushPool <- true
|
|
|
- }
|
|
|
- go func() {
|
|
|
- t := time.NewTicker(time.Minute)
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-t.C:
|
|
|
- for i := 0; i < SysConfig.MinutePushSize-len(mySelf.minutePushPool); i++ {
|
|
|
- mySelf.minutePushPool <- true
|
|
|
- }
|
|
|
- for i := 0; i < SysConfig.FastigiumMinutePushSize-len(mySelf.fastigiumMinutePushPool); i++ {
|
|
|
- mySelf.fastigiumMinutePushPool <- true
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
-}
|
|
|
-
|
|
|
type Move struct {
|
|
|
Info map[string]interface{}
|
|
|
Ids []interface{}
|
|
|
}
|
|
|
|
|
|
-type PushJob struct {
|
|
|
- pushPool chan bool
|
|
|
- minutePushPool chan bool
|
|
|
- fastigiumMinutePushPool chan bool
|
|
|
- pushWait *sync.WaitGroup
|
|
|
- pushJobLock *sync.Mutex
|
|
|
- lastId string
|
|
|
- pushDatas *[]map[string]interface{}
|
|
|
-}
|
|
|
-
|
|
|
-func (p *PushJob) MySelf() interface{} {
|
|
|
- return p
|
|
|
+type pushJob struct {
|
|
|
+ taskType int
|
|
|
+ pool chan bool
|
|
|
+ wait *sync.WaitGroup
|
|
|
+ lock *sync.Mutex
|
|
|
+ lastId string
|
|
|
+ users *[]map[string]interface{}
|
|
|
}
|
|
|
|
|
|
//taskType 1--实时推送 2--实时推送+一天三次的8点推送 3--一天三次推送 4--九点推送
|
|
|
-func (p *PushJob) Execute(taskType int, isMoveDatas bool) {
|
|
|
- p.pushJobLock.Lock()
|
|
|
- defer p.pushJobLock.Unlock()
|
|
|
- logger.Info("开始推送任务。。。", taskType)
|
|
|
+func (p *pushJob) Execute(taskType int, isMoveDatas bool) {
|
|
|
+ p.lock.Lock()
|
|
|
+ defer p.lock.Unlock()
|
|
|
+ p.taskType = taskType
|
|
|
+ logger.Info("开始推送任务。。。", p.taskType)
|
|
|
if isMoveDatas {
|
|
|
- p.Move(taskType)
|
|
|
+ p.Move()
|
|
|
}
|
|
|
- p.Push(taskType)
|
|
|
+ p.Push()
|
|
|
}
|
|
|
-func (p *PushJob) Move(taskType int) {
|
|
|
- logger.Info("推送任务", taskType, "开始迁移数据。。。")
|
|
|
+func (p *pushJob) Move() {
|
|
|
+ logger.Info("推送任务", p.taskType, "开始迁移数据。。。")
|
|
|
nowUnix := time.Now().Unix()
|
|
|
sess := mongodb.GetMgoConn()
|
|
|
defer mongodb.DestoryMongoConn(sess)
|
|
@@ -100,7 +55,7 @@ func (p *PushJob) Move(taskType int) {
|
|
|
},
|
|
|
}).Sort("userid").Iter()
|
|
|
moves := map[string]*Move{}
|
|
|
- logger.Info("推送任务", taskType, "开始遍历pushspace_temp")
|
|
|
+ logger.Info("推送任务", p.taskType, "开始遍历pushspace_temp")
|
|
|
index, number, length := 0, 0, 0
|
|
|
for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
userId := util.ObjToString(temp["userid"])
|
|
@@ -111,7 +66,7 @@ func (p *PushJob) Move(taskType int) {
|
|
|
for _, v := range list {
|
|
|
idMap[util.ObjToString((*v.Info)["_id"])] = true
|
|
|
}
|
|
|
- newList := p.ToSortList(temp["list"])
|
|
|
+ newList := putil.ToSortList(temp["list"])
|
|
|
for _, v := range newList {
|
|
|
if idMap[util.ObjToString((*v.Info)["_id"])] {
|
|
|
continue
|
|
@@ -126,7 +81,7 @@ func (p *PushJob) Move(taskType int) {
|
|
|
move.Info = temp
|
|
|
move.Ids = append(move.Ids, temp["_id"])
|
|
|
} else {
|
|
|
- temp["list"] = p.ToSortList(temp["list"])
|
|
|
+ temp["list"] = putil.ToSortList(temp["list"])
|
|
|
move = &Move{
|
|
|
Info: temp,
|
|
|
Ids: []interface{}{temp["_id"]},
|
|
@@ -137,23 +92,23 @@ func (p *PushJob) Move(taskType int) {
|
|
|
index++
|
|
|
length++
|
|
|
if index%500 == 0 {
|
|
|
- logger.Info("推送任务", taskType, "pushspace_temp加载到内存:", index)
|
|
|
+ logger.Info("推送任务", p.taskType, "pushspace_temp加载到内存:", index)
|
|
|
}
|
|
|
if length == SysConfig.MoveBatch {
|
|
|
length = 0
|
|
|
- p.Merge(taskType, &number, nowUnix, moves)
|
|
|
+ p.Merge(&number, nowUnix, moves)
|
|
|
moves = map[string]*Move{}
|
|
|
}
|
|
|
}
|
|
|
if length > 0 {
|
|
|
- p.Merge(taskType, &number, nowUnix, moves)
|
|
|
+ p.Merge(&number, nowUnix, moves)
|
|
|
}
|
|
|
moves = nil
|
|
|
- logger.Info("推送任务", taskType, "迁移数据结束。。。", index)
|
|
|
+ logger.Info("推送任务", p.taskType, "迁移数据结束。。。", index)
|
|
|
}
|
|
|
-func (p *PushJob) Merge(taskType int, number *int, nowUnix int64, moves map[string]*Move) {
|
|
|
+func (p *pushJob) Merge(number *int, nowUnix int64, moves map[string]*Move) {
|
|
|
*number++
|
|
|
- logger.Info("推送任务", taskType, "第", *number, "次开始合并数据")
|
|
|
+ logger.Info("推送任务", p.taskType, "第", *number, "次开始合并数据")
|
|
|
index := 0
|
|
|
saveArray := []map[string]interface{}{}
|
|
|
saveArray_delete := []interface{}{}
|
|
@@ -186,7 +141,7 @@ func (p *PushJob) Merge(taskType int, number *int, nowUnix int64, moves map[stri
|
|
|
}
|
|
|
pushAll := make(map[string]interface{})
|
|
|
for _, v := range []string{"", "temp"} {
|
|
|
- oldList := p.ToSortList(data[v+"list"])
|
|
|
+ oldList := putil.ToSortList(data[v+"list"])
|
|
|
if v == "temp" && oldList == nil {
|
|
|
continue
|
|
|
}
|
|
@@ -232,7 +187,7 @@ func (p *PushJob) Merge(taskType int, number *int, nowUnix int64, moves map[stri
|
|
|
}
|
|
|
index++
|
|
|
if index%500 == 0 {
|
|
|
- logger.Info("推送任务", taskType, "第", *number, "次合并数据:", index)
|
|
|
+ logger.Info("推送任务", p.taskType, "第", *number, "次合并数据:", index)
|
|
|
}
|
|
|
}
|
|
|
if len(saveArray) > 0 {
|
|
@@ -241,9 +196,9 @@ func (p *PushJob) Merge(taskType int, number *int, nowUnix int64, moves map[stri
|
|
|
if len(updateArray_query) > 0 {
|
|
|
p.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
|
|
|
}
|
|
|
- logger.Info("推送任务", taskType, "第", *number, "次合并数据结束。。。", index)
|
|
|
+ logger.Info("推送任务", p.taskType, "第", *number, "次合并数据结束。。。", index)
|
|
|
}
|
|
|
-func (p *PushJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
|
|
|
+func (p *pushJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
|
|
|
coll := sess.DB(DbName).C("pushspace")
|
|
|
bulk := coll.Bulk()
|
|
|
for _, v := range *saves {
|
|
@@ -257,7 +212,7 @@ func (p *PushJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, d
|
|
|
}
|
|
|
*saves = []map[string]interface{}{}
|
|
|
}
|
|
|
-func (p *PushJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
|
|
|
+func (p *pushJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
|
|
|
coll := sess.DB(DbName).C("pushspace")
|
|
|
bulk := coll.Bulk()
|
|
|
for k, v := range *array_q {
|
|
@@ -272,7 +227,7 @@ func (p *PushJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]i
|
|
|
*array_q = []map[string]interface{}{}
|
|
|
*array_s = []map[string]interface{}{}
|
|
|
}
|
|
|
-func (p *PushJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
|
|
|
+func (p *pushJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
|
|
|
coll := sess.DB(DbName).C("pushspace_temp")
|
|
|
count := 0
|
|
|
bulk := coll.Bulk()
|
|
@@ -298,49 +253,48 @@ func (p *PushJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
|
|
|
}
|
|
|
*array = []interface{}{}
|
|
|
}
|
|
|
-func (p *PushJob) Push(taskType int) {
|
|
|
- logger.Info("推送任务", taskType, "开始推送。。。")
|
|
|
- p.lastId = ""
|
|
|
+func (p *pushJob) Push() {
|
|
|
+ logger.Info("推送任务", p.taskType, "开始推送。。。")
|
|
|
batch_index := 0
|
|
|
for {
|
|
|
batch_index++
|
|
|
- batch_size := p.OncePushBatch(batch_index, taskType)
|
|
|
- for _, temp := range *p.pushDatas {
|
|
|
- p.pushPool <- true
|
|
|
- p.pushWait.Add(1)
|
|
|
+ batch_size := p.OncePushBatch(batch_index)
|
|
|
+ for _, temp := range *p.users {
|
|
|
+ p.pool <- true
|
|
|
+ p.wait.Add(1)
|
|
|
go func(v map[string]interface{}) {
|
|
|
defer func() {
|
|
|
- <-p.pushPool
|
|
|
- p.pushWait.Done()
|
|
|
+ <-p.pool
|
|
|
+ p.wait.Done()
|
|
|
}()
|
|
|
words, _ := v["words"].([]interface{})
|
|
|
u := &UserInfo{
|
|
|
- Id: util.ObjToString(v["userid"]),
|
|
|
- OriginalKeys: util.ObjArrToStringArr(words),
|
|
|
- WxPush: util.IntAll(v["wxpush"]),
|
|
|
- AppPush: util.IntAll(v["apppush"]),
|
|
|
- MailPush: util.IntAll(v["mailpush"]),
|
|
|
- Email: util.ObjToString(v["email"]),
|
|
|
- S_m_openid: util.ObjToString(v["s_m_openid"]),
|
|
|
- A_m_openid: util.ObjToString(v["a_m_openid"]),
|
|
|
- Phone: util.ObjToString(v["phone"]),
|
|
|
- Jpushid: util.ObjToString(v["jpushid"]),
|
|
|
- Opushid: util.ObjToString(v["opushid"]),
|
|
|
- UserType: util.IntAll(v["usertype"]),
|
|
|
- RateMode: util.IntAllDef(v["ratemode"], 1),
|
|
|
- SmartSet: util.IntAllDef(v["smartset"], 1),
|
|
|
- DataExport: util.IntAll(v["dataexport"]),
|
|
|
- AppPhoneType: util.ObjToString(v["appphonetype"]),
|
|
|
- ApplyStatus: util.IntAll(v["applystatus"]),
|
|
|
- Subscribe: util.IntAllDef(v["subscribe"], 1),
|
|
|
- ModifyDate: util.ObjToString(v["modifydate"]),
|
|
|
- MergeOrder: v["mergeorder"],
|
|
|
+ Id: util.ObjToString(v["userid"]),
|
|
|
+ OriginalKeys: util.ObjArrToStringArr(words),
|
|
|
+ WxPush: util.IntAll(v["wxpush"]),
|
|
|
+ AppPush: util.IntAll(v["apppush"]),
|
|
|
+ MailPush: util.IntAll(v["mailpush"]),
|
|
|
+ PchelperPush: util.IntAll(v["pchelperpush"]),
|
|
|
+ Email: util.ObjToString(v["email"]),
|
|
|
+ S_m_openid: util.ObjToString(v["s_m_openid"]),
|
|
|
+ A_m_openid: util.ObjToString(v["a_m_openid"]),
|
|
|
+ Phone: util.ObjToString(v["phone"]),
|
|
|
+ Jpushid: util.ObjToString(v["jpushid"]),
|
|
|
+ Opushid: util.ObjToString(v["opushid"]),
|
|
|
+ UserType: util.IntAll(v["usertype"]),
|
|
|
+ RateMode: util.IntAllDef(v["ratemode"], 1),
|
|
|
+ SmartSet: util.IntAllDef(v["smartset"], 1),
|
|
|
+ DataExport: util.IntAll(v["dataexport"]),
|
|
|
+ AppPhoneType: util.ObjToString(v["appphonetype"]),
|
|
|
+ ApplyStatus: util.IntAll(v["applystatus"]),
|
|
|
+ Subscribe: util.IntAllDef(v["subscribe"], 1),
|
|
|
+ ModifyDate: util.ObjToString(v["modifydate"]),
|
|
|
+ MergeOrder: v["mergeorder"],
|
|
|
+ FirstPushTime: util.Int64All(v["firstpushtime"]),
|
|
|
}
|
|
|
- logger.Info("推送任务", taskType, "开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid)
|
|
|
+ logger.Info("推送任务", p.taskType, "开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid)
|
|
|
wxPush, appPush, mailPush := 0, 0, 0
|
|
|
- list := p.ToSortList(v["list"])
|
|
|
- templist := p.ToSortList(v["templist"])
|
|
|
- if taskType == 1 {
|
|
|
+ if p.taskType == 1 {
|
|
|
if u.WxPush == 1 {
|
|
|
if u.ApplyStatus == 1 {
|
|
|
wxPush = -1
|
|
@@ -352,9 +306,15 @@ func (p *PushJob) Push(taskType int) {
|
|
|
appPush = 1
|
|
|
}
|
|
|
if u.MailPush == 1 {
|
|
|
- mailPush = -1
|
|
|
+ if u.UserType == 0 && u.ApplyStatus == 1 {
|
|
|
+ mailPush = -1
|
|
|
+ } else if u.UserType == 5 && u.ApplyStatus == 1 && u.AppPush == 0 {
|
|
|
+ mailPush = -1
|
|
|
+ } else {
|
|
|
+ mailPush = 1
|
|
|
+ }
|
|
|
}
|
|
|
- } else if taskType == 2 || taskType == 4 {
|
|
|
+ } else if p.taskType == 2 || p.taskType == 4 {
|
|
|
if u.WxPush == 1 {
|
|
|
wxPush = 1
|
|
|
}
|
|
@@ -364,26 +324,18 @@ func (p *PushJob) Push(taskType int) {
|
|
|
if u.MailPush == 1 {
|
|
|
mailPush = 1
|
|
|
}
|
|
|
- } else if taskType == 3 {
|
|
|
+ } else if p.taskType == 3 {
|
|
|
if u.WxPush == 1 && u.ApplyStatus == 1 {
|
|
|
wxPush = 1
|
|
|
}
|
|
|
if u.MailPush == 1 {
|
|
|
mailPush = 1
|
|
|
}
|
|
|
- } else if taskType == 4 {
|
|
|
- if u.WxPush == 1 {
|
|
|
- wxPush = 1
|
|
|
- }
|
|
|
- if u.AppPush == 1 {
|
|
|
- appPush = 1
|
|
|
- }
|
|
|
- if u.MailPush == 1 {
|
|
|
- mailPush = 1
|
|
|
- }
|
|
|
}
|
|
|
+ list := putil.ToSortList(v["list"])
|
|
|
+ tempList := putil.ToSortList(v["templist"])
|
|
|
t_wxPush, t_mailPush := util.IntAll(v["tempwxpush"]), util.IntAll(v["tempmailpush"])
|
|
|
- if templist != nil {
|
|
|
+ if tempList != nil {
|
|
|
if wxPush == 1 && t_wxPush == 0 {
|
|
|
wxPush = 0
|
|
|
}
|
|
@@ -391,7 +343,7 @@ func (p *PushJob) Push(taskType int) {
|
|
|
mailPush = 0
|
|
|
}
|
|
|
}
|
|
|
- logger.Info("推送任务", taskType, "用户接收方式,userid", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "t_wxPush", t_wxPush, "t_mailPush", t_mailPush)
|
|
|
+ logger.Info("推送任务", p.taskType, "用户接收方式,userid", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "t_wxPush", t_wxPush, "t_mailPush", t_mailPush)
|
|
|
if wxPush != 1 && appPush != 1 && mailPush != 1 {
|
|
|
return
|
|
|
}
|
|
@@ -403,50 +355,31 @@ func (p *PushJob) Push(taskType int) {
|
|
|
appPush = 0
|
|
|
}
|
|
|
if mailPush != 0 {
|
|
|
- if (u.UserType == 0 || u.UserType == 3) && u.Subscribe == 0 {
|
|
|
+ if u.UserType == 0 && u.Subscribe == 0 {
|
|
|
mailPush = 0
|
|
|
- } else if (u.UserType == 1 || u.UserType == 2 || u.UserType == 4) && u.Jpushid == "" && u.Opushid == "" {
|
|
|
+ } else if (u.UserType == 1 || u.UserType == 2) && u.Jpushid == "" && u.Opushid == "" {
|
|
|
mailPush = 0
|
|
|
} else if u.UserType == 5 && u.Subscribe == 0 && u.Jpushid == "" && u.Opushid == "" {
|
|
|
mailPush = 0
|
|
|
}
|
|
|
}
|
|
|
- if wxPush == 1 || appPush == 1 || mailPush == 1 {
|
|
|
- isSaveSuccess := false
|
|
|
- //wxPushStatus, appPushStatus, mailPushStatus := 0, 0, 0
|
|
|
- //开通订阅推送或邮箱推送的用户,由实时推送修改成九点推送,app推送用list字段,微信和邮箱推送用templist字段
|
|
|
- if list != nil && templist != nil {
|
|
|
- isSaveSuccess, _, _, _ = p.DealSend(taskType, true, 0, appPush, 0, u, &list)
|
|
|
- if !isSaveSuccess {
|
|
|
- return
|
|
|
- }
|
|
|
- p.DealSend(taskType, false, wxPush, 0, mailPush, u, &templist)
|
|
|
- } else if list != nil {
|
|
|
- isSaveSuccess, _, _, _ = p.DealSend(taskType, true, wxPush, appPush, mailPush, u, &list)
|
|
|
- if !isSaveSuccess {
|
|
|
- return
|
|
|
- }
|
|
|
- } else if templist != nil {
|
|
|
- p.DealSend(taskType, false, wxPush, 0, mailPush, u, &templist)
|
|
|
- }
|
|
|
- }
|
|
|
- /*if wxPush == 1 {
|
|
|
- if wxPushStatus == -1 {
|
|
|
- wxPush = -1
|
|
|
- } else {
|
|
|
- wxPush = 0
|
|
|
+ isSaveSuccess, wxStatus, appStatus, mailStatus := DoPush.Execute(p.taskType, wxPush, appPush, mailPush, u, list, tempList)
|
|
|
+ if isSaveSuccess {
|
|
|
+ if u.FirstPushTime == 0 {
|
|
|
+ go mongodb.Update("user", map[string]interface{}{
|
|
|
+ "_id": bson.ObjectIdHex(u.Id),
|
|
|
+ }, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "l_firstpushtime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ }, false, false)
|
|
|
}
|
|
|
+ } else {
|
|
|
+ return
|
|
|
}
|
|
|
- if mailPush == 1 {
|
|
|
- if mailPushStatus == -1 {
|
|
|
- mailPush = -1
|
|
|
- } else {
|
|
|
- mailPush = 0
|
|
|
- }
|
|
|
- }*/
|
|
|
//判断是否要删除数据
|
|
|
- _sess := mongodb.GetMgoConn()
|
|
|
- defer mongodb.DestoryMongoConn(_sess)
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
if wxPush == -1 || mailPush == -1 {
|
|
|
//如果该用户还有微信或者邮箱推送,把list字段的值挪到templist
|
|
|
update := map[string]interface{}{}
|
|
@@ -454,384 +387,93 @@ func (p *PushJob) Push(taskType int) {
|
|
|
"tempwxpush": wxPush,
|
|
|
"tempmailpush": mailPush,
|
|
|
}
|
|
|
- if templist == nil {
|
|
|
+ if tempList == nil {
|
|
|
update["$rename"] = map[string]interface{}{"list": "templist"}
|
|
|
} else {
|
|
|
update["$unset"] = map[string]interface{}{"list": ""}
|
|
|
}
|
|
|
update["$set"] = set
|
|
|
- _sess.DB(DbName).C("pushspace").UpdateId(v["_id"], update)
|
|
|
- } else {
|
|
|
- _sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
|
|
|
- }
|
|
|
- }(temp)
|
|
|
- }
|
|
|
- if batch_size < SysConfig.PushBatch {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- p.pushWait.Wait()
|
|
|
- logger.Info("推送任务结束。。。", taskType)
|
|
|
-}
|
|
|
-func (p *PushJob) DealSend(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess bool, wxPushStatus, appPushStatus, mailPushStatus int) {
|
|
|
- defer util.Catch()
|
|
|
- str := fmt.Sprintf("<div>根据您设置的关键词(%s),给您推送以下信息:</div>", strings.Join(k.OriginalKeys, ";"))
|
|
|
- mailContent := ""
|
|
|
- //发送内容组合
|
|
|
- i := 0
|
|
|
- jpushtitle := ""
|
|
|
- lastInfoDate := int64(0)
|
|
|
- TitleArray := []string{}
|
|
|
- o_pushinfo := map[string]map[string]interface{}{}
|
|
|
- matchKey_infoIndex := map[string]string{}
|
|
|
- publishTitle := map[string]bool{}
|
|
|
- //邮件附件
|
|
|
- var fmdatas = []map[string]interface{}{}
|
|
|
- for _, ks := range *sl {
|
|
|
- k2 := *ks.Info
|
|
|
- title := strings.Replace(k2["title"].(string), "\n", "", -1)
|
|
|
- area := util.ObjToString(k2["area"])
|
|
|
- if area == "A" {
|
|
|
- area = "全国"
|
|
|
- }
|
|
|
- newTitle := fmt.Sprintf("[%s]%s", area, title)
|
|
|
- if publishTitle[newTitle] {
|
|
|
- continue
|
|
|
- }
|
|
|
- publishTitle[title] = true
|
|
|
- i++
|
|
|
- TitleArray = append(TitleArray, Re.ReplaceAllString(newTitle, "$1"))
|
|
|
- if i == 1 {
|
|
|
- jpushtitle = title
|
|
|
- lastInfoDate = util.Int64All(k2["publishtime"])
|
|
|
- }
|
|
|
- //_sid := util.EncodeArticleId(util.BsonIdToSId(k2["_id"]))
|
|
|
- //url := fmt.Sprintf("%s/pcdetail/%s.html", Domain, _sid)
|
|
|
- _sid := util.EncodeArticleId2ByCheck(util.ObjToString(k2["_id"]))
|
|
|
- //增加行业的处理
|
|
|
- industry := ""
|
|
|
- industryclass := "industry"
|
|
|
- if k2["s_subscopeclass"] != nil {
|
|
|
- k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
|
|
|
- if len(k2sub) > 0 {
|
|
|
- industry = k2sub[0]
|
|
|
- if industry != "" {
|
|
|
- ss := strings.Split(industry, "_")
|
|
|
- if len(ss) > 1 {
|
|
|
- industry = ss[0]
|
|
|
+ err := sess.DB(DbName).C("pushspace").UpdateId(v["_id"], update)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送任务", p.taskType, "update error", err)
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if mailPush == 1 { //关于邮件的处理
|
|
|
- mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
|
|
|
- url := fmt.Sprintf("%s/article/mailprivate/%s.html", SysConfig.JianyuDomain, mailSid)
|
|
|
- classArea := "area"
|
|
|
- classType := "type"
|
|
|
- types := util.ObjToString(k2["subtype"])
|
|
|
- if types == "" {
|
|
|
- types = util.ObjToString(k2["toptype"])
|
|
|
- if types == "" {
|
|
|
- types = "其他"
|
|
|
- }
|
|
|
- }
|
|
|
- dates := util.LongToDate(k2["publishtime"], false)
|
|
|
- //标题替换
|
|
|
- otitle := title
|
|
|
- for _, kw := range k.OriginalKeys {
|
|
|
- kws := strings.Split(kw, "+")
|
|
|
- n := 0
|
|
|
- otitle2 := otitle
|
|
|
- for _, kwn := range kws {
|
|
|
- ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
|
|
|
- if ot != otitle {
|
|
|
- n++
|
|
|
- otitle2 = ot
|
|
|
- } else {
|
|
|
- break
|
|
|
+ } else {
|
|
|
+ err := sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送任务", p.taskType, "update error", err)
|
|
|
}
|
|
|
}
|
|
|
- if n == len(kws) {
|
|
|
- otitle = otitle2
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if industry == "" {
|
|
|
- industryclass = ""
|
|
|
- }
|
|
|
- mailContent += fmt.Sprintf(SysConfig.Mail_content, i, url, otitle, classArea, area, classType, types, industryclass, industry, dates)
|
|
|
- }
|
|
|
- str += "<div class='tslist'><span class='xh'>" + fmt.Sprintf("%d", i) + ".</span><a class='bt' target='_blank' eid='" + _sid + "' href='" + util.ObjToString(k2["href"]) + "'>[<span class='area'>" + area + "</span>]" + title + "</a></div>"
|
|
|
- o_pushinfo[strconv.Itoa(i)] = map[string]interface{}{
|
|
|
- "publishtime": k2["publishtime"],
|
|
|
- "stype": util.ObjToString(k2["type"]),
|
|
|
- "topstype": util.ObjToString(k2["toptype"]),
|
|
|
- "substype": util.ObjToString(k2["subtype"]),
|
|
|
- "subscopeclass": industry,
|
|
|
- "buyer": k2["buyer"],
|
|
|
- "projectname": k2["projectname"],
|
|
|
- "budget": k2["budget"],
|
|
|
- "bidopentime": k2["bidopentime"],
|
|
|
- "winner": k2["winner"],
|
|
|
- "bidamount": k2["bidamount"],
|
|
|
- }
|
|
|
- //附件数据
|
|
|
- fmdata := map[string]interface{}{
|
|
|
- "publishtime": k2["publishtime"],
|
|
|
- "subtype": k2["subtype"],
|
|
|
- "buyer": k2["buyer"],
|
|
|
- "projectname": k2["projectname"],
|
|
|
- "budget": k2["budget"],
|
|
|
- "bidopentime": k2["bidopentime"],
|
|
|
- "winner": k2["winner"],
|
|
|
- "bidamount": k2["bidamount"],
|
|
|
- }
|
|
|
- fmdatas = append(fmdatas, fmdata)
|
|
|
- //匹配到的关键词
|
|
|
- for _, key := range (*ks).Keys {
|
|
|
- if matchKey_infoIndex[key] != "" {
|
|
|
- matchKey_infoIndex[key] = matchKey_infoIndex[key] + ","
|
|
|
- }
|
|
|
- matchKey_infoIndex[key] = matchKey_infoIndex[key] + strconv.Itoa(i)
|
|
|
- }
|
|
|
- if i >= SysConfig.MaxPushSize {
|
|
|
- //限制最大信息条数
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if i == 0 {
|
|
|
- logger.Info("推送任务", taskType, "没有要推送的数据!", k.S_m_openid, k.A_m_openid, k.Phone)
|
|
|
- return
|
|
|
- }
|
|
|
- if isSave {
|
|
|
- //推送记录id
|
|
|
- pushId := putil.SaveSendInfo(taskType, k, str, o_pushinfo, matchKey_infoIndex)
|
|
|
- if pushId == "" {
|
|
|
- logger.Info("推送任务", taskType, "保存到cassandra出错", k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
|
|
|
- return
|
|
|
- } else {
|
|
|
- logger.Info("推送任务", taskType, "成功保存到cassandra", pushId, k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
|
|
|
- }
|
|
|
- isSaveSuccess = true
|
|
|
- }
|
|
|
- //九点推送的时候,限制一分钟最大的推送数量
|
|
|
- if taskType == 4 && (wxPush == 1 || appPush == 1) {
|
|
|
- hour := time.Now().Hour()
|
|
|
- fastigiumStart, fastigiumEnd := 0, 0
|
|
|
- fastigiumTimes := strings.Split(SysConfig.FastigiumTime, "-")
|
|
|
- if len(fastigiumTimes) == 2 {
|
|
|
- fastigiumStart = util.IntAll(fastigiumTimes[0])
|
|
|
- fastigiumEnd = util.IntAll(fastigiumTimes[1])
|
|
|
- }
|
|
|
- if hour >= fastigiumStart && hour <= fastigiumEnd {
|
|
|
- <-p.fastigiumMinutePushPool //高峰期
|
|
|
- } else {
|
|
|
- <-p.minutePushPool //正常期
|
|
|
- }
|
|
|
- }
|
|
|
- if wxPush == 1 {
|
|
|
- isPushOk := true
|
|
|
- if k.ApplyStatus == 1 {
|
|
|
- TmpTip := ""
|
|
|
- minute := time.Now().Unix() - lastInfoDate
|
|
|
- if minute > -1 && minute < 61 {
|
|
|
- TmpTip = fmt.Sprintf("%d秒前发布的", minute)
|
|
|
- } else {
|
|
|
- minute = minute / 60
|
|
|
- if minute < 121 {
|
|
|
- if minute < 1 {
|
|
|
- minute = 1
|
|
|
+ if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {
|
|
|
+ f_count, err := sess.DB(DbName).C("pushspace").FindId(v["_id"]).Count()
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送任务", p.taskType, "find count error", err)
|
|
|
+ return
|
|
|
}
|
|
|
- TmpTip = fmt.Sprintf("%d分钟前发布的", minute)
|
|
|
- }
|
|
|
- }
|
|
|
- Tip1 := util.If(TmpTip == "", "", TmpTip+":\n").(string)
|
|
|
- LastTip := ""
|
|
|
- if i > 1 {
|
|
|
- LastTip = fmt.Sprintf("...(共%d条)", i)
|
|
|
- }
|
|
|
- LastTipLen := len([]rune(LastTip))
|
|
|
- wxTitleKeys := strings.Join(k.OriginalKeys, ";")
|
|
|
- if len([]rune(wxTitleKeys)) > 8 {
|
|
|
- wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
|
|
|
- }
|
|
|
- wxtitle := fmt.Sprintf(SysConfig.WxTitle, wxTitleKeys)
|
|
|
- TitleLen := len([]rune(wxtitle))
|
|
|
- GroupLen := len([]rune(k.ModifyDate))
|
|
|
- reLen := 200 - TitleLen - GroupLen - WxContentLen - len([]rune(Tip1))
|
|
|
- //if infoType == 2 {
|
|
|
- // reLen = reLen - 4
|
|
|
- //}
|
|
|
- WXTitle := ""
|
|
|
- bshow := false
|
|
|
- for n := 1; n < len(TitleArray)+1; n++ {
|
|
|
- curTitle := TitleArray[n-1]
|
|
|
- tmptitle := WXTitle + fmt.Sprintf("%d %s\n", n, curTitle)
|
|
|
- ch := reLen - len([]rune(tmptitle))
|
|
|
- if ch < LastTipLen { //加上后大于后辍,则没有完全显示
|
|
|
- if ch == 0 && n == len(TitleArray) {
|
|
|
- WXTitle = tmptitle
|
|
|
- bshow = true
|
|
|
+ if f_count == 0 {
|
|
|
+ v["failtime"] = time.Now().Unix()
|
|
|
+ if wxStatus == -1 {
|
|
|
+ v["wxfail"] = 1
|
|
|
+ }
|
|
|
+ if appStatus == -1 {
|
|
|
+ v["appfail"] = 1
|
|
|
+ }
|
|
|
+ if mailStatus == -1 {
|
|
|
+ v["mailfail"] = 1
|
|
|
+ }
|
|
|
+ err := sess.DB(DbName).C("pushspace_fail").Insert(v)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送任务", p.taskType, "update error", err)
|
|
|
+ }
|
|
|
} else {
|
|
|
- ch_1 := reLen - len([]rune(WXTitle)) - LastTipLen
|
|
|
- if ch_1 > 8 {
|
|
|
- curLen := len([]rune(curTitle))
|
|
|
- if ch_1 > curLen {
|
|
|
- ch_1 = curLen
|
|
|
- }
|
|
|
- WXTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
|
|
|
+ f_update := map[string]interface{}{}
|
|
|
+ if wxStatus == -1 {
|
|
|
+ f_update["wxfail"] = 1
|
|
|
+ }
|
|
|
+ if appStatus == -1 {
|
|
|
+ f_update["appfail"] = 1
|
|
|
+ }
|
|
|
+ if mailStatus == -1 {
|
|
|
+ f_update["mailfail"] = 1
|
|
|
+ }
|
|
|
+ err := sess.DB(DbName).C("pushspace_fail").UpdateId(v["_id"], map[string]interface{}{"$set": f_update})
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送任务", p.taskType, "update error", err)
|
|
|
}
|
|
|
- }
|
|
|
- } else if ch == LastTipLen {
|
|
|
- WXTitle = tmptitle
|
|
|
- if n == len(TitleArray) {
|
|
|
- bshow = true
|
|
|
- }
|
|
|
- } else {
|
|
|
- WXTitle = tmptitle
|
|
|
- if n == len(TitleArray) {
|
|
|
- bshow = true
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- if bshow {
|
|
|
- LastTip = ""
|
|
|
- }
|
|
|
- //推送微信
|
|
|
- isPushOk = putil.SendWeixin(k, Tip1+WXTitle+LastTip, o_pushinfo, matchKey_infoIndex, wxtitle)
|
|
|
- if isPushOk {
|
|
|
- wxPushStatus = 1
|
|
|
- } else {
|
|
|
- wxPushStatus = -1
|
|
|
- }
|
|
|
- }
|
|
|
- logger.Info("推送任务", taskType, "微信推送", isPushOk, k.Id, k.S_m_openid, k.RateMode, k.ApplyStatus)
|
|
|
- }
|
|
|
- if appPush == 1 {
|
|
|
- if len([]rune(jpushtitle)) > 80 {
|
|
|
- jpushtitle = string([]rune(jpushtitle)[:80]) + "..."
|
|
|
- }
|
|
|
- if i > 1 {
|
|
|
- jpushtitle = fmt.Sprintf("1. %s\n...(共%d条)", jpushtitle, i)
|
|
|
- }
|
|
|
- go mongodb.Update("user", map[string]interface{}{
|
|
|
- "_id": bson.ObjectIdHex(k.Id),
|
|
|
- }, map[string]interface{}{
|
|
|
- "$inc": map[string]interface{}{
|
|
|
- "i_apppushunread": 1,
|
|
|
- },
|
|
|
- }, false, false)
|
|
|
- sess_openid := k.A_m_openid
|
|
|
- if sess_openid == "" {
|
|
|
- sess_openid = k.Phone
|
|
|
- }
|
|
|
- if sess_openid == "" {
|
|
|
- sess_openid = k.S_m_openid
|
|
|
- }
|
|
|
- isPushOk := putil.SendApp(map[string]interface{}{
|
|
|
- "phoneType": k.AppPhoneType,
|
|
|
- "descript": jpushtitle,
|
|
|
- "type": "bid",
|
|
|
- "userId": k.Id,
|
|
|
- "openId": sess_openid,
|
|
|
- "url": "/jyapp/free/sess/" + Se.EncodeString(sess_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"),
|
|
|
- //"url": "/jyapp/free/sess/" + push.Se.EncodeString(k.Openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",wxpushlist") + "__" + pushid,
|
|
|
- "otherPushId": k.Opushid,
|
|
|
- "jgPushId": k.Jpushid, //极光-推送id
|
|
|
- })
|
|
|
- if isPushOk {
|
|
|
- appPushStatus = 1
|
|
|
- } else {
|
|
|
- appPushStatus = -1
|
|
|
- }
|
|
|
- logger.Info("推送任务", taskType, "app推送", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.AppPhoneType, k.Jpushid, k.Opushid, k.RateMode)
|
|
|
- }
|
|
|
- //发送邮件
|
|
|
- if mailPush == 1 {
|
|
|
- html := fmt.Sprintf(SysConfig.Mail_html, strings.Replace(strings.Join(k.OriginalKeys, ";"), "+", " ", -1), mailContent)
|
|
|
- subject := fmt.Sprintf(SysConfig.Mail_title, "招标")
|
|
|
- isPushOk := p.SendMail(k.Email, subject, html, fmdatas)
|
|
|
- if isPushOk {
|
|
|
- mailPushStatus = 1
|
|
|
- } else {
|
|
|
- mailPushStatus = -1
|
|
|
- }
|
|
|
- logger.Info("推送任务", taskType, "发送邮件", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.Email, k.DataExport)
|
|
|
- }
|
|
|
- if wxPush == 1 || appPush == 1 || (mailPush == 1 && wxPush == 0 && appPush == 0) {
|
|
|
- //pc端助手推送
|
|
|
- openid := k.S_m_openid
|
|
|
- if openid == "" {
|
|
|
- openid = k.Phone
|
|
|
- }
|
|
|
- if openid != "" {
|
|
|
- putil.SendPcHelper(map[string]interface{}{"clientCode": openid})
|
|
|
- }
|
|
|
- }
|
|
|
- return
|
|
|
-}
|
|
|
-
|
|
|
-//推送邮件(含附件)
|
|
|
-func (p *PushJob) SendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
|
|
|
- if !SysConfig.IsPushMail || len(Gmails) == 0 {
|
|
|
- return true
|
|
|
- }
|
|
|
- if SysConfig.MailSleep > 0 {
|
|
|
- time.Sleep(time.Duration(SysConfig.MailSleep) * time.Millisecond)
|
|
|
- }
|
|
|
- defer util.Catch()
|
|
|
- //生成附件
|
|
|
- var fnamepath, rename string
|
|
|
- if len(fmdatas) > 0 { //开启导出
|
|
|
- fnamepath, rename = putil.GetBidInfoXlsx(fmdatas)
|
|
|
- if messyCodeEmailReg.MatchString(email) {
|
|
|
- rename = time.Now().Format("2006-01-02") + ".xlsx"
|
|
|
+ }(temp)
|
|
|
}
|
|
|
- }
|
|
|
- status := false
|
|
|
- index := len(email) % 2
|
|
|
- var gmail *mail.GmailAuth
|
|
|
- if index >= len(Gmails) {
|
|
|
- index = 0
|
|
|
- }
|
|
|
- gmail = Gmails[index]
|
|
|
- for i := 0; i < len(Gmails); i++ {
|
|
|
- status = mail.GSendMail("剑鱼标讯", email, "", "", subject, html, fnamepath, rename, gmail)
|
|
|
- if status {
|
|
|
+ if batch_size < SysConfig.PushBatch {
|
|
|
break
|
|
|
- } else {
|
|
|
- gmail = Gmails[int(math.Abs(float64(index-1)))]
|
|
|
}
|
|
|
}
|
|
|
- if fnamepath != "" {
|
|
|
- os.Remove(fnamepath)
|
|
|
- }
|
|
|
- return status
|
|
|
+ p.wait.Wait()
|
|
|
+ p.lastId = ""
|
|
|
+ p.users = nil
|
|
|
+ logger.Info("推送任务结束。。。", p.taskType)
|
|
|
}
|
|
|
-
|
|
|
-/************************************************/
|
|
|
-func (p *PushJob) OncePushBatch(batch_index, taskType int) int {
|
|
|
- p.pushDatas = &[]map[string]interface{}{}
|
|
|
+func (p *pushJob) OncePushBatch(batch_index int) int {
|
|
|
+ p.users = &[]map[string]interface{}{}
|
|
|
i := 0
|
|
|
sess := mongodb.GetMgoConn()
|
|
|
defer mongodb.DestoryMongoConn(sess)
|
|
|
var query map[string]interface{}
|
|
|
- //实时推送,只查找ratemode==1的用户
|
|
|
- if taskType == 1 || taskType == 2 {
|
|
|
+ //根据任务类型,查找ratemode
|
|
|
+ if p.taskType == 1 || p.taskType == 2 {
|
|
|
query = map[string]interface{}{
|
|
|
"ratemode": 1,
|
|
|
}
|
|
|
- } else if taskType == 3 {
|
|
|
+ } else if p.taskType == 3 {
|
|
|
query = map[string]interface{}{
|
|
|
"ratemode": 1,
|
|
|
"applystatus": 1,
|
|
|
}
|
|
|
- } else if taskType == 4 {
|
|
|
+ } else if p.taskType == 4 {
|
|
|
query = map[string]interface{}{
|
|
|
"ratemode": 2,
|
|
|
}
|
|
|
} else {
|
|
|
- logger.Error("taskType error", taskType)
|
|
|
+ logger.Error("taskType error", p.taskType)
|
|
|
return i
|
|
|
}
|
|
|
if len(SysConfig.TestIds) > 0 {
|
|
@@ -844,33 +486,17 @@ func (p *PushJob) OncePushBatch(batch_index, taskType int) int {
|
|
|
"$gt": bson.ObjectIdHex(p.lastId),
|
|
|
}
|
|
|
}
|
|
|
- logger.Info("推送任务", taskType, "开始加载第", batch_index, "批用户", query)
|
|
|
+ logger.Info("推送任务", p.taskType, "开始加载第", batch_index, "批用户", query)
|
|
|
it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
|
|
|
for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
i++
|
|
|
p.lastId = util.BsonIdToSId(temp["_id"])
|
|
|
- *p.pushDatas = append(*p.pushDatas, temp)
|
|
|
+ *p.users = append(*p.users, temp)
|
|
|
temp = make(map[string]interface{})
|
|
|
if i == SysConfig.PushBatch {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
- logger.Info("推送任务", taskType, "第", batch_index, "批用户加载结束", p.lastId)
|
|
|
+ logger.Info("推送任务", p.taskType, "第", batch_index, "批用户加载结束", p.lastId)
|
|
|
return i
|
|
|
}
|
|
|
-func (p *PushJob) ToSortList(list interface{}) SortList {
|
|
|
- if list == nil {
|
|
|
- return nil
|
|
|
- }
|
|
|
- b, err := json.Marshal(list)
|
|
|
- if err != nil {
|
|
|
- return nil
|
|
|
- }
|
|
|
- sl := make(SortList, 0)
|
|
|
- err = json.Unmarshal(b, &sl)
|
|
|
- if err != nil {
|
|
|
- return nil
|
|
|
- }
|
|
|
- sort.Sort(sl)
|
|
|
- return sl
|
|
|
-}
|