mamager.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package job
  2. import (
  3. "context"
  4. "doFreeClueSign/public"
  5. "fmt"
  6. "github.com/gogf/gf/v2/container/gmap"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/os/gcron"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "github.com/gogf/gf/v2/os/gfile"
  11. "github.com/gogf/gf/v2/util/gconv"
  12. "time"
  13. )
  14. type JobManager struct {
  15. payUser *gmap.Map //付费账户
  16. lastRun *struct {
  17. BindPhone string `json:"bindPhone"`
  18. AgainSub string `json:"againSub"`
  19. NewActivity string `json:"newActivity"`
  20. }
  21. }
  22. func InitJobManager() *JobManager {
  23. var (
  24. err error
  25. ctx = gctx.New()
  26. bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "bindPhoneAndSubAgain").String()
  27. activityUserCronStr = g.Cfg().MustGet(ctx, "activityUser").String()
  28. )
  29. job := &JobManager{
  30. payUser: gmap.New(true),
  31. }
  32. job.LoadLastRun()
  33. if bindPhoneAndSubAgainCronStr != "" {
  34. _, err = gcron.Add(ctx, bindPhoneAndSubAgainCronStr, func(ctx context.Context) {
  35. job.LoadPayUser()
  36. job.LoadAgainSubUser()
  37. job.LoadBindPhoneUser()
  38. }, "bindPhoneAndSubAgain")
  39. if err != nil {
  40. panic(err)
  41. }
  42. gcron.Start("bindPhoneAndSubAgain")
  43. }
  44. if activityUserCronStr != "" {
  45. _, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
  46. job.LoadPayUser()
  47. job.LoadActivityUser()
  48. }, "activityUser")
  49. if err != nil {
  50. panic(err)
  51. }
  52. gcron.Start("activityUser")
  53. }
  54. return job
  55. }
  56. func (jm *JobManager) LoadLastRun() {
  57. err := gconv.Struct(gfile.GetContents("runSign.json"), &jm.lastRun)
  58. if err != nil {
  59. g.Log().Errorf(context.TODO(), "加载上次运行配置日常")
  60. }
  61. now := time.Now().Format(time.DateTime)
  62. if jm.lastRun.BindPhone == "" {
  63. jm.lastRun.BindPhone = now
  64. }
  65. if jm.lastRun.AgainSub == "" {
  66. jm.lastRun.AgainSub = now
  67. }
  68. if jm.lastRun.NewActivity == "" {
  69. jm.lastRun.NewActivity = now
  70. }
  71. return
  72. }
  73. func (jm *JobManager) SaveLastRun() error {
  74. return gfile.PutContents("runSign.json", gconv.String(jm.lastRun))
  75. }
  76. func (jm *JobManager) LoadPayUser() {
  77. newMap := gmap.New(true)
  78. for i, v := range public.GetPayUser() {
  79. newMap.Set(i, v)
  80. }
  81. jm.payUser = newMap
  82. }
  83. func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interface{}) error {
  84. var (
  85. sql = `INSERT INTO freeClubSign `
  86. val []interface{}
  87. now = time.Now().Format(time.DateTime)
  88. operationTime string
  89. )
  90. if msg, ok := value.(*public.AgainSubUserMsg); ok {
  91. if jm.payUser.Contains(msg.MgoUserID) {
  92. return nil
  93. }
  94. operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
  95. sql = `(mogUserId,sub_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE sub_again_date=?`
  96. val = append(val, msg.MgoUserID, operationTime, now, operationTime)
  97. } else if msg, ok := value.(*public.BindMsg); ok {
  98. if jm.payUser.Contains(msg.MgoUserID) {
  99. return nil
  100. }
  101. operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
  102. sql += `(mogUserId,phone,bind_phone_date,create_time)VALUES (?, ?, ?, ?)ON DUPLICATE KEY UPDATE phone=?, bind_phone_date=?`
  103. val = append(val, msg.MgoUserID, msg.Phone, operationTime, now, msg.Phone, operationTime)
  104. } else if msg, ok := value.(*public.NewActiveMsg); ok {
  105. if jm.payUser.Contains(msg.MgoUserID) {
  106. return nil
  107. }
  108. operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
  109. sql = `(mogUserId,act_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE act_again_date=?`
  110. val = append(val, msg.MgoUserID, operationTime, now, operationTime)
  111. } else {
  112. return fmt.Errorf("未知类型")
  113. }
  114. _, err := g.DB("bi_service").Exec(context.TODO(), sql, val...)
  115. if err != nil {
  116. return err
  117. }
  118. return nil
  119. }