push.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package pusher
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/mail"
  7. util "app.yhyue.com/moapp/jybase/common"
  8. . "app.yhyue.com/moapp/jybase/mongodb"
  9. . "bp.jydev.jianyu360.cn/pushpkg/db"
  10. . "bp.jydev.jianyu360.cn/pushpkg/p"
  11. "github.com/donnie4w/go-logger/logger"
  12. )
  13. var VarPush = &Push{}
  14. type Push struct {
  15. Mgo *MongodbSim
  16. DbName string
  17. CollName string
  18. }
  19. func (p *Push) AfterPush(ids []interface{}) {
  20. if err := MyMgo.DelBulk(p.Mgo, nil, p.DbName, p.CollName, &ids); err != nil {
  21. logger.Error("推送任务", ids, "删除出错", err)
  22. }
  23. }
  24. /*获取一批次推送的数据
  25. *batchIndex < 0 不走分批次加载
  26. */
  27. func (p *Push) GetPushDatas(taskType, batchIndex, pushBatch int, startId *string, unique string, query map[string]interface{}) (bool, *map[string]*PushInfo) {
  28. logger.Info(taskType, p.CollName, "开始加载第", batchIndex, "批用户", query)
  29. users := map[string]*PushInfo{}
  30. i := 0
  31. sess := p.Mgo.GetMgoConn()
  32. defer p.Mgo.DestoryMongoConn(sess)
  33. it := sess.DB(p.DbName).C(p.CollName).Find(query).Sort(unique).Iter()
  34. prevUnique := ""
  35. for temp := make(map[string]interface{}); it.Next(&temp); {
  36. thisUnique := fmt.Sprint(temp[unique])
  37. if batchIndex > 0 && i >= pushBatch && prevUnique != thisUnique {
  38. break
  39. }
  40. i++
  41. *startId = thisUnique
  42. pushInfo := users[thisUnique]
  43. if pushInfo == nil {
  44. pushInfo = &PushInfo{Info: temp}
  45. } else {
  46. oldList, _ := pushInfo.Info["list"].([]interface{})
  47. newList, _ := temp["list"].([]interface{})
  48. newList = append(newList, oldList...)
  49. pushInfo.Info["list"] = newList
  50. }
  51. pushInfo.Ids = append(pushInfo.Ids, temp["_id"])
  52. users[thisUnique] = pushInfo
  53. prevUnique = thisUnique
  54. temp = make(map[string]interface{})
  55. }
  56. logger.Info(taskType, p.CollName, "第", batchIndex, "批用户加载结束", *startId)
  57. return batchIndex < 0 || i < pushBatch, &users
  58. }
  59. //获取当前有效的推送模式
  60. func (p *Push) ValieRateModes(pushWeek string, pushDay int) []int {
  61. now := time.Now()
  62. rateMode := []int{}
  63. if strings.ToLower(now.Weekday().String()) == strings.ToLower(pushWeek) {
  64. rateMode = append(rateMode, 3)
  65. }
  66. if now.Day() == pushDay {
  67. rateMode = append(rateMode, 4)
  68. }
  69. return rateMode
  70. }
  71. //推送邮件
  72. func (p *Push) SendMail(isPushMail bool, mailSleep int, gmails []*mail.GmailAuth, email, title, html string) bool {
  73. defer util.Catch()
  74. if !isPushMail || len(gmails) == 0 {
  75. return true
  76. }
  77. if mailSleep > 0 {
  78. time.Sleep(time.Duration(mailSleep) * time.Millisecond)
  79. }
  80. return mail.PollingMail(email, gmails, func(gmail *mail.GmailAuth) bool {
  81. return mail.GSendMail("剑鱼标讯", email, "", "", title, html, "", "", gmail)
  82. })
  83. }