mamager.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package job
  2. import (
  3. "context"
  4. "doFreeClueSign/db"
  5. "doFreeClueSign/public"
  6. "fmt"
  7. "time"
  8. "github.com/gogf/gf/v2/container/gmap"
  9. "github.com/gogf/gf/v2/frame/g"
  10. "github.com/gogf/gf/v2/os/gcron"
  11. "github.com/gogf/gf/v2/os/gctx"
  12. "github.com/gogf/gf/v2/os/gfile"
  13. "github.com/gogf/gf/v2/util/gconv"
  14. )
  15. type JobManager struct {
  16. payUser *gmap.Map //付费账户
  17. register *gmap.Map
  18. lastRun *struct {
  19. BindPhone string `json:"bindPhone"`
  20. AgainSub string `json:"againSub"`
  21. NewActivity string `json:"newActivity"`
  22. }
  23. }
  24. var (
  25. activityUserJobRunning bool
  26. bindPhoneAndSubAgainJobRunning bool
  27. ctx = gctx.New()
  28. LastId = g.Cfg().MustGet(ctx, "lastId").Int()
  29. )
  30. func InitJobManager() *JobManager {
  31. var (
  32. err error
  33. bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "cron.bindPhoneAndSubAgain").String()
  34. //activityUserCronStr = g.Cfg().MustGet(ctx, "cron.activityUser").String()
  35. newRegisterCronStr = g.Cfg().MustGet(ctx, "cron.newRegisterAndBind").String()
  36. )
  37. job := &JobManager{
  38. payUser: gmap.New(true),
  39. register: gmap.New(true),
  40. }
  41. job.LoadLastRun()
  42. if bindPhoneAndSubAgainCronStr != "" {
  43. _, err = gcron.Add(ctx, bindPhoneAndSubAgainCronStr, func(ctx context.Context) {
  44. g.Log().Infof(ctx, "bindPhoneAndSubAgain start %v", bindPhoneAndSubAgainJobRunning)
  45. if bindPhoneAndSubAgainJobRunning {
  46. return
  47. }
  48. bindPhoneAndSubAgainJobRunning = true
  49. defer func() {
  50. bindPhoneAndSubAgainJobRunning = false
  51. }()
  52. job.LoadPayUser()
  53. job.LoadAgainSubUser()
  54. //job.LoadBindPhoneUser()
  55. }, "bindPhoneAndSubAgain")
  56. if err != nil {
  57. panic(err)
  58. }
  59. gcron.Start("bindPhoneAndSubAgain")
  60. }
  61. // if activityUserCronStr != "" {
  62. // _, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
  63. // g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
  64. // if activityUserJobRunning {
  65. // return
  66. // }
  67. // activityUserJobRunning = true
  68. // defer func() {
  69. // activityUserJobRunning = false
  70. // }()
  71. // job.LoadPayUser()
  72. // job.LoadActivityUser()
  73. // }, "activityUser")
  74. // if err != nil {
  75. // panic(err)
  76. // }
  77. // gcron.Start("activityUser")
  78. // }
  79. if newRegisterCronStr != "" {
  80. _, err = gcron.Add(ctx, newRegisterCronStr, func(ctx context.Context) {
  81. g.Log().Infof(ctx, "newister start")
  82. loadOrder()
  83. LoadOrderOther() // 未来一天内到期的新注册用户
  84. }, "newister")
  85. if err != nil {
  86. panic(err)
  87. }
  88. gcron.Start("newister")
  89. }
  90. return job
  91. }
  92. func (jm *JobManager) LoadLastRun() {
  93. err := gconv.Struct(gfile.GetContents("runSign.json"), &jm.lastRun)
  94. if err != nil {
  95. g.Log().Errorf(context.TODO(), "加载上次运行配置日常")
  96. }
  97. now := time.Now().Format(time.DateTime)
  98. if jm.lastRun.BindPhone == "" {
  99. jm.lastRun.BindPhone = now
  100. }
  101. if jm.lastRun.AgainSub == "" {
  102. jm.lastRun.AgainSub = now
  103. }
  104. if jm.lastRun.NewActivity == "" {
  105. jm.lastRun.NewActivity = now
  106. }
  107. return
  108. }
  109. func (jm *JobManager) SaveLastRun() error {
  110. return gfile.PutContents("runSign.json", gconv.String(jm.lastRun))
  111. }
  112. func (jm *JobManager) LoadPayUser() {
  113. g.Log().Infof(context.TODO(), "加载付费用户")
  114. newMap := gmap.New(true)
  115. for i, v := range public.GetPayUser() {
  116. newMap.Set(i, v)
  117. }
  118. jm.payUser = newMap
  119. }
  120. func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interface{}) error {
  121. var (
  122. sql = `INSERT INTO freeClubSign `
  123. val []interface{}
  124. now = time.Now().Format(time.DateTime)
  125. operationTime string
  126. )
  127. if msg, ok := value.(*public.AgainSubUserMsg); ok {
  128. if jm.payUser.Contains(msg.MgoUserID) {
  129. return nil
  130. }
  131. operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
  132. sql += `(mogUserId,register_time,sub_again_date,create_time)VALUES (?,?,?,?)ON DUPLICATE KEY UPDATE sub_again_date=?`
  133. val = append(val, msg.MgoUserID, jm.GetAndCacheRegisterDate(msg.MgoUserID), operationTime, now, operationTime)
  134. } else if msg, ok := value.(*public.BindMsg); ok {
  135. if jm.payUser.Contains(msg.MgoUserID) {
  136. return nil
  137. }
  138. operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
  139. sql += `(mogUserId,register_time,bind_phone_date,create_time)VALUES (?,?,?,?,?)ON DUPLICATE KEY UPDATE bind_phone_date=?`
  140. val = append(val, msg.MgoUserID, jm.GetAndCacheRegisterDate(msg.MgoUserID), msg.Phone, operationTime, now, msg.Phone, operationTime)
  141. } else if msg, ok := value.(*public.NewActiveMsg); ok {
  142. if jm.payUser.Contains(msg.MgoUserID) {
  143. return nil
  144. }
  145. operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
  146. registerTime := jm.GetAndCacheRegisterDate(msg.MgoUserID)
  147. if registerTime > time.Unix(msg.TimeStamp, 0).AddDate(0, 0, -30).Format(time.DateTime) {
  148. return nil
  149. }
  150. sql += `(mogUserId,register_time,act_again_date,create_time)VALUES (?,?,?,?)ON DUPLICATE KEY UPDATE act_again_date=?`
  151. val = append(val, msg.MgoUserID, registerTime, operationTime, now, operationTime)
  152. } else {
  153. return fmt.Errorf("未知类型")
  154. }
  155. _, err := g.DB("bi_service").Exec(context.TODO(), sql, val...)
  156. if err != nil {
  157. return err
  158. }
  159. return nil
  160. }
  161. // GetAndCacheRegisterDate 获取缓存用户注册时间
  162. func (jm *JobManager) GetAndCacheRegisterDate(mgoUserId string) string {
  163. if registerDate := gconv.String(jm.register.Get(mgoUserId)); registerDate != "" {
  164. return registerDate
  165. }
  166. rMap, _ := db.MG.DB().FindById("user", mgoUserId, `{"l_registedate":1}`)
  167. if rMap == nil || len(*rMap) == 0 {
  168. return ""
  169. }
  170. date := time.Unix(gconv.Int64((*rMap)["l_registedate"]), 0).Format(time.DateTime)
  171. jm.register.Set(mgoUserId, date)
  172. return date
  173. }