|
@@ -0,0 +1,176 @@
|
|
|
+package job
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "doFreeClueSign/db"
|
|
|
+ "doFreeClueSign/public"
|
|
|
+ "fmt"
|
|
|
+ "github.com/gogf/gf/v2/container/gmap"
|
|
|
+ "github.com/gogf/gf/v2/frame/g"
|
|
|
+ "github.com/gogf/gf/v2/os/gcron"
|
|
|
+ "github.com/gogf/gf/v2/os/gctx"
|
|
|
+ "github.com/gogf/gf/v2/os/gfile"
|
|
|
+ "github.com/gogf/gf/v2/util/gconv"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+type JobManager struct {
|
|
|
+ payUser *gmap.Map //付费账户
|
|
|
+ register *gmap.Map
|
|
|
+ lastRun *struct {
|
|
|
+ BindPhone string `json:"bindPhone"`
|
|
|
+ AgainSub string `json:"againSub"`
|
|
|
+ NewActivity string `json:"newActivity"`
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ activityUserJobRunning bool
|
|
|
+ bindPhoneAndSubAgainJobRunning bool
|
|
|
+)
|
|
|
+
|
|
|
+func InitJobManager() *JobManager {
|
|
|
+ var (
|
|
|
+ err error
|
|
|
+ ctx = gctx.New()
|
|
|
+ bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "cron.bindPhoneAndSubAgain").String()
|
|
|
+ activityUserCronStr = g.Cfg().MustGet(ctx, "cron.activityUser").String()
|
|
|
+ )
|
|
|
+
|
|
|
+ job := &JobManager{
|
|
|
+ payUser: gmap.New(true),
|
|
|
+ register: gmap.New(true),
|
|
|
+ }
|
|
|
+ job.LoadLastRun()
|
|
|
+
|
|
|
+ if bindPhoneAndSubAgainCronStr != "" {
|
|
|
+ _, err = gcron.Add(ctx, bindPhoneAndSubAgainCronStr, func(ctx context.Context) {
|
|
|
+ g.Log().Infof(ctx, "bindPhoneAndSubAgain start %v", bindPhoneAndSubAgainJobRunning)
|
|
|
+ if bindPhoneAndSubAgainJobRunning {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ bindPhoneAndSubAgainJobRunning = true
|
|
|
+ defer func() {
|
|
|
+ bindPhoneAndSubAgainJobRunning = false
|
|
|
+ }()
|
|
|
+
|
|
|
+ job.LoadPayUser()
|
|
|
+ job.LoadAgainSubUser()
|
|
|
+ //job.LoadBindPhoneUser()
|
|
|
+ }, "bindPhoneAndSubAgain")
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ gcron.Start("bindPhoneAndSubAgain")
|
|
|
+ }
|
|
|
+
|
|
|
+ if activityUserCronStr != "" {
|
|
|
+ _, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
|
|
|
+ g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
|
|
|
+ if activityUserJobRunning {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ activityUserJobRunning = true
|
|
|
+ defer func() {
|
|
|
+ activityUserJobRunning = false
|
|
|
+ }()
|
|
|
+
|
|
|
+ job.LoadPayUser()
|
|
|
+ job.LoadActivityUser()
|
|
|
+
|
|
|
+ }, "activityUser")
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ gcron.Start("activityUser")
|
|
|
+ }
|
|
|
+
|
|
|
+ return job
|
|
|
+}
|
|
|
+
|
|
|
+func (jm *JobManager) LoadLastRun() {
|
|
|
+ err := gconv.Struct(gfile.GetContents("runSign.json"), &jm.lastRun)
|
|
|
+ if err != nil {
|
|
|
+ g.Log().Errorf(context.TODO(), "加载上次运行配置日常")
|
|
|
+ }
|
|
|
+ now := time.Now().Format(time.DateTime)
|
|
|
+ if jm.lastRun.BindPhone == "" {
|
|
|
+ jm.lastRun.BindPhone = now
|
|
|
+ }
|
|
|
+ if jm.lastRun.AgainSub == "" {
|
|
|
+ jm.lastRun.AgainSub = now
|
|
|
+ }
|
|
|
+ if jm.lastRun.NewActivity == "" {
|
|
|
+ jm.lastRun.NewActivity = now
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func (jm *JobManager) SaveLastRun() error {
|
|
|
+ return gfile.PutContents("runSign.json", gconv.String(jm.lastRun))
|
|
|
+}
|
|
|
+
|
|
|
+func (jm *JobManager) LoadPayUser() {
|
|
|
+ g.Log().Infof(context.TODO(), "加载付费用户")
|
|
|
+ newMap := gmap.New(true)
|
|
|
+ for i, v := range public.GetPayUser() {
|
|
|
+ newMap.Set(i, v)
|
|
|
+ }
|
|
|
+ jm.payUser = newMap
|
|
|
+}
|
|
|
+
|
|
|
+func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interface{}) error {
|
|
|
+ var (
|
|
|
+ sql = `INSERT INTO freeClubSign `
|
|
|
+ val []interface{}
|
|
|
+ now = time.Now().Format(time.DateTime)
|
|
|
+ operationTime string
|
|
|
+ )
|
|
|
+ if msg, ok := value.(*public.AgainSubUserMsg); ok {
|
|
|
+ if jm.payUser.Contains(msg.MgoUserID) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
|
|
|
+ sql += `(mogUserId,register_time,sub_again_date,create_time)VALUES (?,?,?,?)ON DUPLICATE KEY UPDATE sub_again_date=?`
|
|
|
+ val = append(val, msg.MgoUserID, jm.GetAndCacheRegisterDate(msg.MgoUserID), operationTime, now, operationTime)
|
|
|
+ } else if msg, ok := value.(*public.BindMsg); ok {
|
|
|
+ if jm.payUser.Contains(msg.MgoUserID) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
|
|
|
+ sql += `(mogUserId,register_time,bind_phone_date,create_time)VALUES (?,?,?,?,?)ON DUPLICATE KEY UPDATE bind_phone_date=?`
|
|
|
+ val = append(val, msg.MgoUserID, jm.GetAndCacheRegisterDate(msg.MgoUserID), msg.Phone, operationTime, now, msg.Phone, operationTime)
|
|
|
+ } else if msg, ok := value.(*public.NewActiveMsg); ok {
|
|
|
+ if jm.payUser.Contains(msg.MgoUserID) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
|
|
|
+ registerTime := jm.GetAndCacheRegisterDate(msg.MgoUserID)
|
|
|
+ if registerTime > time.Unix(msg.TimeStamp, 0).AddDate(0, 0, -30).Format(time.DateTime) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ sql += `(mogUserId,register_time,act_again_date,create_time)VALUES (?,?,?,?)ON DUPLICATE KEY UPDATE act_again_date=?`
|
|
|
+ val = append(val, msg.MgoUserID, registerTime, operationTime, now, operationTime)
|
|
|
+ } else {
|
|
|
+ return fmt.Errorf("未知类型")
|
|
|
+ }
|
|
|
+ _, err := g.DB("bi_service").Exec(context.TODO(), sql, val...)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// GetAndCacheRegisterDate 获取缓存用户注册时间
|
|
|
+func (jm *JobManager) GetAndCacheRegisterDate(mgoUserId string) string {
|
|
|
+ if registerDate := gconv.String(jm.register.Get(mgoUserId)); registerDate != "" {
|
|
|
+ return registerDate
|
|
|
+ }
|
|
|
+ rMap, _ := db.MG.DB().FindById("user", mgoUserId, `{"l_registedate":1}`)
|
|
|
+ if rMap == nil || len(*rMap) == 0 {
|
|
|
+ return ""
|
|
|
+ }
|
|
|
+ date := time.Unix(gconv.Int64((*rMap)["l_registedate"]), 0).Format(time.DateTime)
|
|
|
+ jm.register.Set(mgoUserId, date)
|
|
|
+ return date
|
|
|
+}
|