|
@@ -0,0 +1,804 @@
|
|
|
+package job
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "os"
|
|
|
+ . "public"
|
|
|
+ . "push/config"
|
|
|
+ putil "push/util"
|
|
|
+ "qfw/util"
|
|
|
+ "qfw/util/mail"
|
|
|
+ "qfw/util/mongodb"
|
|
|
+ "regexp"
|
|
|
+ "sort"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/donnie4w/go-logger/logger"
|
|
|
+ mgo "gopkg.in/mgo.v2"
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ BulkSize = 200
|
|
|
+ DbName = "qfw"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ messyCodeEmailReg = regexp.MustCompile(SysConfig.MessyCodeEmailReg)
|
|
|
+ MoveFields = []string{"s_m_openid", "a_m_openid", "phone", "usertype", "jpushid", "opushid", "words", "ratemode", "wxpush", "apppush", "mailpush", "smartset", "timestamp", "subscribe", "applystatus", "appphonetype", "email", "size", "modifydate", "mergeorder"}
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ mySelf := Jobs.Push.MySelf().(*PushJob)
|
|
|
+ //推送1分钟限制
|
|
|
+ mySelf.minutePushPool = make(chan bool, SysConfig.MinutePushSize)
|
|
|
+ mySelf.fastigiumMinutePushPool = make(chan bool, SysConfig.FastigiumMinutePushSize)
|
|
|
+ for i := 0; i < SysConfig.MinutePushSize; i++ {
|
|
|
+ mySelf.minutePushPool <- true
|
|
|
+ }
|
|
|
+ for i := 0; i < SysConfig.FastigiumMinutePushSize; i++ {
|
|
|
+ mySelf.fastigiumMinutePushPool <- true
|
|
|
+ }
|
|
|
+ go func() {
|
|
|
+ t := time.NewTicker(time.Minute)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-t.C:
|
|
|
+ for i := 0; i < SysConfig.MinutePushSize-len(mySelf.minutePushPool); i++ {
|
|
|
+ mySelf.minutePushPool <- true
|
|
|
+ }
|
|
|
+ for i := 0; i < SysConfig.FastigiumMinutePushSize-len(mySelf.fastigiumMinutePushPool); i++ {
|
|
|
+ mySelf.fastigiumMinutePushPool <- true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+}
|
|
|
+
|
|
|
+type PushJob struct {
|
|
|
+ pushPool chan bool
|
|
|
+ minutePushPool chan bool
|
|
|
+ fastigiumMinutePushPool chan bool
|
|
|
+ pushWait *sync.WaitGroup
|
|
|
+ pushJobLock *sync.Mutex
|
|
|
+ lastId string
|
|
|
+ pushDatas *[]map[string]interface{}
|
|
|
+}
|
|
|
+
|
|
|
+func (p *PushJob) MySelf() interface{} {
|
|
|
+ return p
|
|
|
+}
|
|
|
+
|
|
|
+//taskType 1--实时推送 2--实时推送+一天三次的8点推送 3--一天三次推送 4--九点推送
|
|
|
+func (p *PushJob) Execute(taskType int, isMoveDatas bool) {
|
|
|
+ p.pushJobLock.Lock()
|
|
|
+ defer p.pushJobLock.Unlock()
|
|
|
+ logger.Info("开始推送任务。。。", taskType)
|
|
|
+ if isMoveDatas {
|
|
|
+ p.Move()
|
|
|
+ }
|
|
|
+ p.Push(taskType)
|
|
|
+}
|
|
|
+func (p *PushJob) Move() {
|
|
|
+ logger.Info("推送任务,开始迁移数据。。。")
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
+ nowUnix := time.Now().Unix()
|
|
|
+ it := sess.DB(DbName).C("pushspace_temp").Find(map[string]interface{}{
|
|
|
+ "timestamp": map[string]interface{}{
|
|
|
+ "$lt": nowUnix,
|
|
|
+ },
|
|
|
+ }).Sort("_id").Iter()
|
|
|
+ _index := 0
|
|
|
+ pushspace_temp := map[string]map[string]interface{}{}
|
|
|
+ logger.Info("推送任务,开始把pushspace_temp表的数据加载到内存中")
|
|
|
+ for _temp := make(map[string]interface{}); it.Next(&_temp); {
|
|
|
+ userId := util.ObjToString(_temp["userid"])
|
|
|
+ if pushspace_temp[userId] != nil {
|
|
|
+ sl := make(SortList, 0)
|
|
|
+ if newList := p.ToSortList(_temp["list"], userId); newList != nil {
|
|
|
+ sl = append(sl, newList...)
|
|
|
+ }
|
|
|
+ oldList, _ := pushspace_temp[userId]["list"].(SortList)
|
|
|
+ sl = append(sl, oldList...)
|
|
|
+ sort.Sort(sl)
|
|
|
+ if len(sl) > SysConfig.MaxPushSize {
|
|
|
+ sl = sl[:SysConfig.MaxPushSize]
|
|
|
+ }
|
|
|
+ _temp["list"] = sl
|
|
|
+ } else {
|
|
|
+ _temp["list"] = p.ToSortList(_temp["list"], userId)
|
|
|
+ }
|
|
|
+ pushspace_temp[userId] = _temp
|
|
|
+ _temp = make(map[string]interface{})
|
|
|
+ _index++
|
|
|
+ if _index%500 == 0 {
|
|
|
+ logger.Info("推送任务,pushspace_temp表的数据加载到内存中:", _index)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,pushspace_temp表的数据加载完成")
|
|
|
+ index := 0
|
|
|
+ saveArray := []map[string]interface{}{}
|
|
|
+ updateArray_d := []map[string]interface{}{}
|
|
|
+ updateArray_q := []map[string]interface{}{}
|
|
|
+ updateArray_s := []map[string]interface{}{}
|
|
|
+ for userId, temp := range pushspace_temp {
|
|
|
+ var data map[string]interface{}
|
|
|
+ sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
|
|
|
+ if data == nil { //批量新增
|
|
|
+ saveArray = append(saveArray, temp)
|
|
|
+ if len(saveArray) == BulkSize {
|
|
|
+ p.SaveBulk(sess, &saveArray, nowUnix)
|
|
|
+ }
|
|
|
+ } else { //批量更新
|
|
|
+ setMap := map[string]interface{}{}
|
|
|
+ for _, field := range MoveFields {
|
|
|
+ if temp[field] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ setMap[field] = temp[field]
|
|
|
+ }
|
|
|
+ //
|
|
|
+ newList, _ := temp["list"].(SortList)
|
|
|
+ if newList == nil || len(newList) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ pLength := len(newList)
|
|
|
+ pushAll := make(map[string]interface{})
|
|
|
+ for _, v := range []string{"", "temp"} {
|
|
|
+ oldList := p.ToSortList(data[v+"list"], userId)
|
|
|
+ if v == "temp" && oldList == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ rLength := len(oldList)
|
|
|
+ if rLength+pLength > SysConfig.MaxPushSize {
|
|
|
+ newList = append(newList, oldList...)
|
|
|
+ if len(newList) > SysConfig.MaxPushSize {
|
|
|
+ newList = newList[:SysConfig.MaxPushSize]
|
|
|
+ }
|
|
|
+ setMap[v+"list"] = newList
|
|
|
+ setMap[v+"size"] = SysConfig.MaxPushSize
|
|
|
+ } else { //追加
|
|
|
+ setMap[v+"size"] = rLength + pLength
|
|
|
+ pushAll[v+"list"] = newList
|
|
|
+ }
|
|
|
+ }
|
|
|
+ upSet := map[string]interface{}{
|
|
|
+ "$set": setMap,
|
|
|
+ }
|
|
|
+ if len(pushAll) > 0 {
|
|
|
+ upSet["$pushAll"] = pushAll
|
|
|
+ }
|
|
|
+ updateArray_d = append(updateArray_d, map[string]interface{}{"userid": userId})
|
|
|
+ updateArray_q = append(updateArray_q, map[string]interface{}{"_id": data["_id"]})
|
|
|
+ updateArray_s = append(updateArray_s, upSet)
|
|
|
+ if len(updateArray_q) == BulkSize {
|
|
|
+ p.UpdateBulk(sess, &updateArray_d, &updateArray_q, &updateArray_s, nowUnix)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ index++
|
|
|
+ if index%500 == 0 {
|
|
|
+ logger.Info("推送任务,迁移数据:", index)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(saveArray) > 0 {
|
|
|
+ p.SaveBulk(sess, &saveArray, nowUnix)
|
|
|
+ }
|
|
|
+ if len(updateArray_q) > 0 {
|
|
|
+ p.UpdateBulk(sess, &updateArray_d, &updateArray_q, &updateArray_s, nowUnix)
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,迁移数据结束。。。", index)
|
|
|
+}
|
|
|
+func (p *PushJob) SaveBulk(sess *mgo.Session, array *[]map[string]interface{}, nowUnix int64) {
|
|
|
+ coll := sess.DB(DbName).C("pushspace")
|
|
|
+ bulk := coll.Bulk()
|
|
|
+ for _, v := range *array {
|
|
|
+ bulk.Insert(v)
|
|
|
+ }
|
|
|
+ _, err := bulk.Run()
|
|
|
+ if nil != err {
|
|
|
+ logger.Info("推送任务,BulkError", err)
|
|
|
+ } else {
|
|
|
+ p.DelBulk(sess, array, nowUnix)
|
|
|
+ }
|
|
|
+ *array = []map[string]interface{}{}
|
|
|
+}
|
|
|
+func (p *PushJob) UpdateBulk(sess *mgo.Session, array_d, array_q, array_s *[]map[string]interface{}, nowUnix int64) {
|
|
|
+ coll := sess.DB(DbName).C("pushspace")
|
|
|
+ bulk := coll.Bulk()
|
|
|
+ for k, v := range *array_q {
|
|
|
+ bulk.Update(v, (*array_s)[k])
|
|
|
+ }
|
|
|
+ _, err := bulk.Run()
|
|
|
+ if nil != err {
|
|
|
+ logger.Info("推送任务,UpdateBulkError", err)
|
|
|
+ } else {
|
|
|
+ p.DelBulk(sess, array_d, nowUnix)
|
|
|
+ }
|
|
|
+ *array_d = []map[string]interface{}{}
|
|
|
+ *array_q = []map[string]interface{}{}
|
|
|
+ *array_s = []map[string]interface{}{}
|
|
|
+}
|
|
|
+func (p *PushJob) DelBulk(sess *mgo.Session, array *[]map[string]interface{}, nowUnix int64) {
|
|
|
+ coll := sess.DB(DbName).C("pushspace_temp")
|
|
|
+ bulk := coll.Bulk()
|
|
|
+ for _, v := range *array {
|
|
|
+ bulk.RemoveAll(map[string]interface{}{
|
|
|
+ "userid": v["userid"],
|
|
|
+ "timestamp": map[string]interface{}{
|
|
|
+ "$lt": nowUnix,
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ _, err := bulk.Run()
|
|
|
+ if nil != err {
|
|
|
+ logger.Info("推送任务,DelBulkError", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+func (p *PushJob) Push(taskType int) {
|
|
|
+ logger.Info("推送任务,开始推送。。。")
|
|
|
+ p.lastId = ""
|
|
|
+ batch_index := 0
|
|
|
+ for {
|
|
|
+ batch_index++
|
|
|
+ batch_size := p.OncePushBatch(batch_index, taskType)
|
|
|
+ for _, temp := range *p.pushDatas {
|
|
|
+ p.pushPool <- true
|
|
|
+ p.pushWait.Add(1)
|
|
|
+ go func(v map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-p.pushPool
|
|
|
+ p.pushWait.Done()
|
|
|
+ }()
|
|
|
+ words, _ := v["words"].([]interface{})
|
|
|
+ u := &UserInfo{
|
|
|
+ Id: util.ObjToString(v["userid"]),
|
|
|
+ OriginalKeys: util.ObjArrToStringArr(words),
|
|
|
+ WxPush: util.IntAll(v["wxpush"]),
|
|
|
+ AppPush: util.IntAll(v["apppush"]),
|
|
|
+ MailPush: util.IntAll(v["mailpush"]),
|
|
|
+ Email: util.ObjToString(v["email"]),
|
|
|
+ S_m_openid: util.ObjToString(v["s_m_openid"]),
|
|
|
+ A_m_openid: util.ObjToString(v["a_m_openid"]),
|
|
|
+ Phone: util.ObjToString(v["phone"]),
|
|
|
+ Jpushid: util.ObjToString(v["jpushid"]),
|
|
|
+ Opushid: util.ObjToString(v["opushid"]),
|
|
|
+ UserType: util.IntAll(v["usertype"]),
|
|
|
+ RateMode: util.IntAllDef(v["ratemode"], 1),
|
|
|
+ SmartSet: util.IntAllDef(v["smartset"], 1),
|
|
|
+ DataExport: util.IntAll(v["dataexport"]),
|
|
|
+ AppPhoneType: util.ObjToString(v["appphonetype"]),
|
|
|
+ ApplyStatus: util.IntAll(v["applystatus"]),
|
|
|
+ Subscribe: util.IntAllDef(v["subscribe"], 1),
|
|
|
+ ModifyDate: util.ObjToString(v["modifydate"]),
|
|
|
+ MergeOrder: v["mergeorder"],
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "jpushid", u.Jpushid, "opushid", u.Opushid, "applystatus", u.ApplyStatus)
|
|
|
+ wxPush, appPush, mailPush := 0, 0, 0
|
|
|
+ list := p.ToSortList(v["list"], u.Id)
|
|
|
+ templist := p.ToSortList(v["templist"], u.Id)
|
|
|
+ if taskType == 1 {
|
|
|
+ if u.WxPush == 1 {
|
|
|
+ if u.ApplyStatus == 1 {
|
|
|
+ wxPush = -1
|
|
|
+ } else {
|
|
|
+ wxPush = 1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if u.AppPush == 1 {
|
|
|
+ appPush = 1
|
|
|
+ }
|
|
|
+ if u.MailPush == 1 {
|
|
|
+ mailPush = -1
|
|
|
+ }
|
|
|
+ } else if taskType == 2 || taskType == 4 {
|
|
|
+ if u.WxPush == 1 {
|
|
|
+ wxPush = 1
|
|
|
+ }
|
|
|
+ if u.AppPush == 1 {
|
|
|
+ appPush = 1
|
|
|
+ }
|
|
|
+ if u.MailPush == 1 {
|
|
|
+ mailPush = 1
|
|
|
+ }
|
|
|
+ } else if taskType == 3 {
|
|
|
+ if u.WxPush == 1 && u.ApplyStatus == 1 {
|
|
|
+ wxPush = 1
|
|
|
+ }
|
|
|
+ if u.MailPush == 1 {
|
|
|
+ mailPush = 1
|
|
|
+ }
|
|
|
+ } else if taskType == 4 {
|
|
|
+ if u.WxPush == 1 {
|
|
|
+ wxPush = 1
|
|
|
+ }
|
|
|
+ if u.AppPush == 1 {
|
|
|
+ appPush = 1
|
|
|
+ }
|
|
|
+ if u.MailPush == 1 {
|
|
|
+ mailPush = 1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //再对取消关注以及app没有登录的用户进行过滤
|
|
|
+ if u.Subscribe == 0 {
|
|
|
+ wxPush = 0
|
|
|
+ }
|
|
|
+ if u.Jpushid == "" && u.Opushid == "" {
|
|
|
+ appPush = 0
|
|
|
+ }
|
|
|
+ if mailPush != 0 {
|
|
|
+ if (u.UserType == 0 || u.UserType == 3) && u.Subscribe == 0 {
|
|
|
+ mailPush = 0
|
|
|
+ } else if (u.UserType == 1 || u.UserType == 2 || u.UserType == 4) && u.Jpushid == "" && u.Opushid == "" {
|
|
|
+ mailPush = 0
|
|
|
+ } else if u.UserType == 5 && u.Subscribe == 0 && u.Jpushid == "" && u.Opushid == "" {
|
|
|
+ mailPush = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ t_wxpush, t_mailpush := util.IntAll(v["tempwxpush"]), util.IntAll(v["tempmailpush"])
|
|
|
+ if templist != nil {
|
|
|
+ if wxPush == 1 && t_wxpush == 0 {
|
|
|
+ wxPush = 0
|
|
|
+ }
|
|
|
+ if mailPush == 1 && t_mailpush == 0 {
|
|
|
+ mailPush = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,本次推送任务类型", taskType, "用户的接收方式", u.Id, wxPush, appPush, mailPush, t_wxpush, t_mailpush)
|
|
|
+ if wxPush == 1 || appPush == 1 || mailPush == 1 {
|
|
|
+ isSaveSuccess := false
|
|
|
+ //wxPushStatus, appPushStatus, mailPushStatus := 0, 0, 0
|
|
|
+ //开通订阅推送或邮箱推送的用户,由实时推送修改成九点推送,app推送用list字段,微信和邮箱推送用templist字段
|
|
|
+ if list != nil && templist != nil {
|
|
|
+ isSaveSuccess, _, _, _ = p.DealSend(taskType, true, 0, appPush, 0, u, &list)
|
|
|
+ if !isSaveSuccess {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ p.DealSend(taskType, false, wxPush, 0, mailPush, u, &templist)
|
|
|
+ } else if list != nil {
|
|
|
+ isSaveSuccess, _, _, _ = p.DealSend(taskType, true, wxPush, appPush, mailPush, u, &list)
|
|
|
+ if !isSaveSuccess {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else if templist != nil {
|
|
|
+ p.DealSend(taskType, false, wxPush, 0, mailPush, u, &templist)
|
|
|
+ }
|
|
|
+ /*if wxPush == 1 {
|
|
|
+ if wxPushStatus == -1 {
|
|
|
+ wxPush = -1
|
|
|
+ } else {
|
|
|
+ wxPush = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if mailPush == 1 {
|
|
|
+ if mailPushStatus == -1 {
|
|
|
+ mailPush = -1
|
|
|
+ } else {
|
|
|
+ mailPush = 0
|
|
|
+ }
|
|
|
+ }*/
|
|
|
+ //判断是否要删除数据
|
|
|
+ _sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(_sess)
|
|
|
+ if wxPush == -1 || mailPush == -1 {
|
|
|
+ //如果该用户还有微信或者邮箱推送,把list字段的值挪到templist
|
|
|
+ update := map[string]interface{}{}
|
|
|
+ set := map[string]interface{}{
|
|
|
+ "tempwxpush": wxPush,
|
|
|
+ "tempmailpush": mailPush,
|
|
|
+ }
|
|
|
+ if templist == nil {
|
|
|
+ update["$rename"] = map[string]interface{}{"list": "templist"}
|
|
|
+ } else {
|
|
|
+ update["$unset"] = map[string]interface{}{"list": ""}
|
|
|
+ }
|
|
|
+ update["$set"] = set
|
|
|
+ _sess.DB(DbName).C("pushspace").UpdateId(v["_id"], update)
|
|
|
+ } else {
|
|
|
+ _sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(temp)
|
|
|
+ }
|
|
|
+ if batch_size < SysConfig.PushBatch {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ p.pushWait.Wait()
|
|
|
+ logger.Info("推送任务结束。。。", taskType)
|
|
|
+}
|
|
|
+func (p *PushJob) DealSend(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess bool, wxPushStatus, appPushStatus, mailPushStatus int) {
|
|
|
+ defer util.Catch()
|
|
|
+ str := fmt.Sprintf("<div>根据您设置的关键词(%s),给您推送以下信息:</div>", strings.Join(k.OriginalKeys, ";"))
|
|
|
+ mailContent := ""
|
|
|
+ //发送内容组合
|
|
|
+ i := 0
|
|
|
+ jpushtitle := ""
|
|
|
+ lastInfoDate := int64(0)
|
|
|
+ TitleArray := []string{}
|
|
|
+ o_pushinfo := map[string]map[string]interface{}{}
|
|
|
+ matchKey_infoIndex := map[string]string{}
|
|
|
+ publishTitle := map[string]bool{}
|
|
|
+ //邮件附件
|
|
|
+ var fmdatas = []map[string]interface{}{}
|
|
|
+ for _, ks := range *sl {
|
|
|
+ k2 := *ks.Info
|
|
|
+ title := strings.Replace(k2["title"].(string), "\n", "", -1)
|
|
|
+ area := util.ObjToString(k2["area"])
|
|
|
+ if area == "A" {
|
|
|
+ area = "全国"
|
|
|
+ }
|
|
|
+ newTitle := fmt.Sprintf("[%s]%s", area, title)
|
|
|
+ if publishTitle[newTitle] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ publishTitle[title] = true
|
|
|
+ i++
|
|
|
+ TitleArray = append(TitleArray, Re.ReplaceAllString(newTitle, "$1"))
|
|
|
+ if i == 1 {
|
|
|
+ jpushtitle = title
|
|
|
+ lastInfoDate = util.Int64All(k2["publishtime"])
|
|
|
+ }
|
|
|
+ //_sid := util.EncodeArticleId(util.BsonIdToSId(k2["_id"]))
|
|
|
+ //url := fmt.Sprintf("%s/pcdetail/%s.html", Domain, _sid)
|
|
|
+ _sid := util.EncodeArticleId2ByCheck(util.ObjToString(k2["_id"]))
|
|
|
+ //增加行业的处理
|
|
|
+ industry := ""
|
|
|
+ industryclass := "industry"
|
|
|
+ if k2["s_subscopeclass"] != nil {
|
|
|
+ k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
|
|
|
+ if len(k2sub) > 0 {
|
|
|
+ industry = k2sub[0]
|
|
|
+ if industry != "" {
|
|
|
+ ss := strings.Split(industry, "_")
|
|
|
+ if len(ss) > 1 {
|
|
|
+ industry = ss[0]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if mailPush == 1 { //关于邮件的处理
|
|
|
+ mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
|
|
|
+ url := fmt.Sprintf("%s/article/mailprivate/%s.html", SysConfig.JianyuDomain, mailSid)
|
|
|
+ classArea := "area"
|
|
|
+ classType := "type"
|
|
|
+ types := util.ObjToString(k2["subtype"])
|
|
|
+ if types == "" {
|
|
|
+ types = util.ObjToString(k2["toptype"])
|
|
|
+ if types == "" {
|
|
|
+ types = "其他"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dates := util.LongToDate(k2["publishtime"], false)
|
|
|
+ //标题替换
|
|
|
+ otitle := title
|
|
|
+ for _, kw := range k.OriginalKeys {
|
|
|
+ kws := strings.Split(kw, "+")
|
|
|
+ n := 0
|
|
|
+ otitle2 := otitle
|
|
|
+ for _, kwn := range kws {
|
|
|
+ ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
|
|
|
+ if ot != otitle {
|
|
|
+ n++
|
|
|
+ otitle2 = ot
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if n == len(kws) {
|
|
|
+ otitle = otitle2
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if industry == "" {
|
|
|
+ industryclass = ""
|
|
|
+ }
|
|
|
+ mailContent += fmt.Sprintf(SysConfig.Mail_content, i, url, otitle, classArea, area, classType, types, industryclass, industry, dates)
|
|
|
+ }
|
|
|
+ str += "<div class='tslist'><span class='xh'>" + fmt.Sprintf("%d", i) + ".</span><a class='bt' target='_blank' eid='" + _sid + "' href='" + util.ObjToString(k2["href"]) + "'>[<span class='area'>" + area + "</span>]" + title + "</a></div>"
|
|
|
+ o_pushinfo[strconv.Itoa(i)] = map[string]interface{}{
|
|
|
+ "publishtime": k2["publishtime"],
|
|
|
+ "stype": util.ObjToString(k2["type"]),
|
|
|
+ "topstype": util.ObjToString(k2["toptype"]),
|
|
|
+ "substype": util.ObjToString(k2["subtype"]),
|
|
|
+ "subscopeclass": industry,
|
|
|
+ "buyer": k2["buyer"],
|
|
|
+ "projectname": k2["projectname"],
|
|
|
+ "budget": k2["budget"],
|
|
|
+ "bidopentime": k2["bidopentime"],
|
|
|
+ "winner": k2["winner"],
|
|
|
+ "bidamount": k2["bidamount"],
|
|
|
+ }
|
|
|
+ //附件数据
|
|
|
+ fmdata := map[string]interface{}{
|
|
|
+ "publishtime": k2["publishtime"],
|
|
|
+ "subtype": k2["subtype"],
|
|
|
+ "buyer": k2["buyer"],
|
|
|
+ "projectname": k2["projectname"],
|
|
|
+ "budget": k2["budget"],
|
|
|
+ "bidopentime": k2["bidopentime"],
|
|
|
+ "winner": k2["winner"],
|
|
|
+ "bidamount": k2["bidamount"],
|
|
|
+ }
|
|
|
+ fmdatas = append(fmdatas, fmdata)
|
|
|
+ //匹配到的关键词
|
|
|
+ for _, key := range (*ks).Keys {
|
|
|
+ if matchKey_infoIndex[key] != "" {
|
|
|
+ matchKey_infoIndex[key] = matchKey_infoIndex[key] + ","
|
|
|
+ }
|
|
|
+ matchKey_infoIndex[key] = matchKey_infoIndex[key] + strconv.Itoa(i)
|
|
|
+ }
|
|
|
+ if i >= SysConfig.MaxPushSize {
|
|
|
+ //限制最大信息条数
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if i == 0 {
|
|
|
+ logger.Info("推送任务,没有要推送的数据!", k.S_m_openid, k.A_m_openid, k.Phone)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if isSave {
|
|
|
+ //推送记录id
|
|
|
+ pushId := putil.SaveSendInfo(taskType, k, str, o_pushinfo, matchKey_infoIndex)
|
|
|
+ if pushId == "" {
|
|
|
+ logger.Info("推送任务,保存到cassandra出错", k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ logger.Info("推送任务,成功保存到cassandra", pushId, k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
|
|
|
+ }
|
|
|
+ isSaveSuccess = true
|
|
|
+ }
|
|
|
+ //九点推送的时候,限制一分钟最大的推送数量
|
|
|
+ if taskType == 4 && (wxPush == 1 || appPush == 1) {
|
|
|
+ hour := time.Now().Hour()
|
|
|
+ fastigiumStart, fastigiumEnd := 0, 0
|
|
|
+ fastigiumTimes := strings.Split(SysConfig.FastigiumTime, "-")
|
|
|
+ if len(fastigiumTimes) == 2 {
|
|
|
+ fastigiumStart = util.IntAll(fastigiumTimes[0])
|
|
|
+ fastigiumEnd = util.IntAll(fastigiumTimes[1])
|
|
|
+ }
|
|
|
+ if hour >= fastigiumStart && hour <= fastigiumEnd {
|
|
|
+ <-p.fastigiumMinutePushPool //高峰期
|
|
|
+ } else {
|
|
|
+ <-p.minutePushPool //正常期
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if wxPush == 1 {
|
|
|
+ isPushOk := true
|
|
|
+ if k.ApplyStatus == 1 {
|
|
|
+ TmpTip := ""
|
|
|
+ minute := time.Now().Unix() - lastInfoDate
|
|
|
+ if minute > -1 && minute < 61 {
|
|
|
+ TmpTip = fmt.Sprintf("%d秒前发布的", minute)
|
|
|
+ } else {
|
|
|
+ minute = minute / 60
|
|
|
+ if minute < 121 {
|
|
|
+ if minute < 1 {
|
|
|
+ minute = 1
|
|
|
+ }
|
|
|
+ TmpTip = fmt.Sprintf("%d分钟前发布的", minute)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Tip1 := util.If(TmpTip == "", "", TmpTip+":\n").(string)
|
|
|
+ LastTip := ""
|
|
|
+ if i > 1 {
|
|
|
+ LastTip = fmt.Sprintf("...(共%d条)", i)
|
|
|
+ }
|
|
|
+ LastTipLen := len([]rune(LastTip))
|
|
|
+ wxTitleKeys := strings.Join(k.OriginalKeys, ";")
|
|
|
+ if len([]rune(wxTitleKeys)) > 8 {
|
|
|
+ wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
|
|
|
+ }
|
|
|
+ wxtitle := fmt.Sprintf(SysConfig.WxTitle, wxTitleKeys)
|
|
|
+ TitleLen := len([]rune(wxtitle))
|
|
|
+ GroupLen := len([]rune(k.ModifyDate))
|
|
|
+ reLen := 200 - TitleLen - GroupLen - WxContentLen - len([]rune(Tip1))
|
|
|
+ //if infoType == 2 {
|
|
|
+ // reLen = reLen - 4
|
|
|
+ //}
|
|
|
+ WXTitle := ""
|
|
|
+ bshow := false
|
|
|
+ for n := 1; n < len(TitleArray)+1; n++ {
|
|
|
+ curTitle := TitleArray[n-1]
|
|
|
+ tmptitle := WXTitle + fmt.Sprintf("%d %s\n", n, curTitle)
|
|
|
+ ch := reLen - len([]rune(tmptitle))
|
|
|
+ if ch < LastTipLen { //加上后大于后辍,则没有完全显示
|
|
|
+ if ch == 0 && n == len(TitleArray) {
|
|
|
+ WXTitle = tmptitle
|
|
|
+ bshow = true
|
|
|
+ } else {
|
|
|
+ ch_1 := reLen - len([]rune(WXTitle)) - LastTipLen
|
|
|
+ if ch_1 > 8 {
|
|
|
+ curLen := len([]rune(curTitle))
|
|
|
+ if ch_1 > curLen {
|
|
|
+ ch_1 = curLen
|
|
|
+ }
|
|
|
+ WXTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if ch == LastTipLen {
|
|
|
+ WXTitle = tmptitle
|
|
|
+ if n == len(TitleArray) {
|
|
|
+ bshow = true
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ WXTitle = tmptitle
|
|
|
+ if n == len(TitleArray) {
|
|
|
+ bshow = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if bshow {
|
|
|
+ LastTip = ""
|
|
|
+ }
|
|
|
+ //推送微信
|
|
|
+ isPushOk = putil.SendWeixin(k, Tip1+WXTitle+LastTip, o_pushinfo, matchKey_infoIndex, wxtitle)
|
|
|
+ if isPushOk {
|
|
|
+ wxPushStatus = 1
|
|
|
+ } else {
|
|
|
+ wxPushStatus = -1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,微信推送", isPushOk, k.Id, k.S_m_openid, k.RateMode, k.ApplyStatus)
|
|
|
+ }
|
|
|
+ if appPush == 1 {
|
|
|
+ if len([]rune(jpushtitle)) > 80 {
|
|
|
+ jpushtitle = string([]rune(jpushtitle)[:80]) + "..."
|
|
|
+ }
|
|
|
+ if i > 1 {
|
|
|
+ jpushtitle = fmt.Sprintf("1. %s\n...(共%d条)", jpushtitle, i)
|
|
|
+ }
|
|
|
+ go mongodb.Update("user", map[string]interface{}{
|
|
|
+ "_id": bson.ObjectIdHex(k.Id),
|
|
|
+ }, map[string]interface{}{
|
|
|
+ "$inc": map[string]interface{}{
|
|
|
+ "i_apppushunread": 1,
|
|
|
+ },
|
|
|
+ }, false, false)
|
|
|
+ sess_openid := k.A_m_openid
|
|
|
+ if sess_openid == "" {
|
|
|
+ sess_openid = k.Phone
|
|
|
+ }
|
|
|
+ if sess_openid == "" {
|
|
|
+ sess_openid = k.S_m_openid
|
|
|
+ }
|
|
|
+ isPushOk := putil.SendApp(map[string]interface{}{
|
|
|
+ "phoneType": k.AppPhoneType,
|
|
|
+ "descript": jpushtitle,
|
|
|
+ "type": "bid",
|
|
|
+ "userId": k.Id,
|
|
|
+ "openId": sess_openid,
|
|
|
+ "url": "/jyapp/free/sess/" + Se.EncodeString(sess_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"),
|
|
|
+ //"url": "/jyapp/free/sess/" + push.Se.EncodeString(k.Openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",wxpushlist") + "__" + pushid,
|
|
|
+ "otherPushId": k.Opushid,
|
|
|
+ "jgPushId": k.Jpushid, //极光-推送id
|
|
|
+ })
|
|
|
+ if isPushOk {
|
|
|
+ appPushStatus = 1
|
|
|
+ } else {
|
|
|
+ appPushStatus = -1
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,app推送", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.AppPhoneType, k.Jpushid, k.Opushid, k.RateMode)
|
|
|
+ }
|
|
|
+ //发送邮件
|
|
|
+ if mailPush == 1 {
|
|
|
+ html := fmt.Sprintf(SysConfig.Mail_html, strings.Replace(strings.Join(k.OriginalKeys, ";"), "+", " ", -1), mailContent)
|
|
|
+ subject := fmt.Sprintf(SysConfig.Mail_title, "招标")
|
|
|
+ isPushOk := p.SendMail(k.Email, subject, html, fmdatas)
|
|
|
+ if isPushOk {
|
|
|
+ mailPushStatus = 1
|
|
|
+ } else {
|
|
|
+ mailPushStatus = -1
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,发送邮件", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.Email, k.DataExport)
|
|
|
+ }
|
|
|
+ if wxPush == 1 || appPush == 1 || (mailPush == 1 && wxPush == 0 && appPush == 0) {
|
|
|
+ //pc端助手推送
|
|
|
+ openid := k.S_m_openid
|
|
|
+ if openid == "" {
|
|
|
+ openid = k.Phone
|
|
|
+ }
|
|
|
+ if openid != "" {
|
|
|
+ putil.SendPcHelper(map[string]interface{}{"clientCode": openid})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+//推送邮件(含附件)
|
|
|
+func (p *PushJob) SendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
|
|
|
+ if !SysConfig.IsPushMail {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ defer util.Catch()
|
|
|
+ //生成附件
|
|
|
+ var fnamepath, rename string
|
|
|
+ if len(fmdatas) > 0 { //开启导出
|
|
|
+ fnamepath, rename = putil.GetBidInfoXlsx(fmdatas)
|
|
|
+ if messyCodeEmailReg.MatchString(email) {
|
|
|
+ rename = time.Now().Format("2006-01-02") + ".xlsx"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //gmail := <-Gmails
|
|
|
+ //defer func() {
|
|
|
+ //Gmails <- gmail
|
|
|
+ //}()
|
|
|
+ status := mail.GSendMail("剑鱼招标订阅", email, "", "", subject, html, fnamepath, rename, Gmail)
|
|
|
+ if fnamepath != "" {
|
|
|
+ os.Remove(fnamepath)
|
|
|
+ }
|
|
|
+ if SysConfig.MailSleep > 0 {
|
|
|
+ time.Sleep(time.Duration(SysConfig.MailSleep) * time.Millisecond)
|
|
|
+ }
|
|
|
+ return status
|
|
|
+}
|
|
|
+
|
|
|
+/************************************************/
|
|
|
+func (p *PushJob) OncePushBatch(batch_index, taskType int) int {
|
|
|
+ p.pushDatas = &[]map[string]interface{}{}
|
|
|
+ i := 0
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
+ var query map[string]interface{}
|
|
|
+ //实时推送,只查找ratemode==1的用户
|
|
|
+ if taskType == 1 || taskType == 2 {
|
|
|
+ query = map[string]interface{}{
|
|
|
+ "ratemode": 1,
|
|
|
+ }
|
|
|
+ } else if taskType == 3 {
|
|
|
+ query = map[string]interface{}{
|
|
|
+ "ratemode": 1,
|
|
|
+ "applystatus": 1,
|
|
|
+ }
|
|
|
+ } else if taskType == 4 {
|
|
|
+ query = map[string]interface{}{
|
|
|
+ "ratemode": 2,
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logger.Error("taskType error", taskType)
|
|
|
+ return i
|
|
|
+ }
|
|
|
+ if len(SysConfig.TestIds) > 0 {
|
|
|
+ query["userid"] = map[string]interface{}{
|
|
|
+ "$in": SysConfig.TestIds,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if p.lastId != "" {
|
|
|
+ query["_id"] = map[string]interface{}{
|
|
|
+ "$gt": bson.ObjectIdHex(p.lastId),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,开始加载第", batch_index, "批用户", query)
|
|
|
+ it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
|
|
|
+ for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
+ i++
|
|
|
+ p.lastId = util.BsonIdToSId(temp["_id"])
|
|
|
+ *p.pushDatas = append(*p.pushDatas, temp)
|
|
|
+ temp = make(map[string]interface{})
|
|
|
+ if i == SysConfig.PushBatch {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("推送任务,第", batch_index, "批用户加载结束", p.lastId)
|
|
|
+ return i
|
|
|
+}
|
|
|
+func (p *PushJob) ToSortList(list interface{}, userId string) SortList {
|
|
|
+ if list == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ b, err := json.Marshal(list)
|
|
|
+ if err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ sl := make(SortList, 0)
|
|
|
+ err = json.Unmarshal(b, &sl)
|
|
|
+ if err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ sort.Sort(sl)
|
|
|
+ return sl
|
|
|
+}
|