123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package job
- import (
- "context"
- "doFreeClueSign/db"
- "doFreeClueSign/public"
- "fmt"
- "time"
- "github.com/gogf/gf/v2/container/gmap"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gcron"
- "github.com/gogf/gf/v2/os/gctx"
- "github.com/gogf/gf/v2/os/gfile"
- "github.com/gogf/gf/v2/util/gconv"
- )
- type JobManager struct {
- payUser *gmap.Map //付费账户
- register *gmap.Map
- lastRun *struct {
- BindPhone string `json:"bindPhone"`
- AgainSub string `json:"againSub"`
- NewActivity string `json:"newActivity"`
- }
- }
- var (
- activityUserJobRunning bool
- bindPhoneAndSubAgainJobRunning bool
- ctx = gctx.New()
- LastId = g.Cfg().MustGet(ctx, "lastId").Int()
- )
- func InitJobManager() *JobManager {
- var (
- err error
- bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "cron.bindPhoneAndSubAgain").String()
- //activityUserCronStr = g.Cfg().MustGet(ctx, "cron.activityUser").String()
- newRegisterCronStr = g.Cfg().MustGet(ctx, "cron.newRegisterAndBind").String()
- )
- job := &JobManager{
- payUser: gmap.New(true),
- register: gmap.New(true),
- }
- job.LoadLastRun()
- if bindPhoneAndSubAgainCronStr != "" {
- _, err = gcron.Add(ctx, bindPhoneAndSubAgainCronStr, func(ctx context.Context) {
- g.Log().Infof(ctx, "bindPhoneAndSubAgain start %v", bindPhoneAndSubAgainJobRunning)
- if bindPhoneAndSubAgainJobRunning {
- return
- }
- bindPhoneAndSubAgainJobRunning = true
- defer func() {
- bindPhoneAndSubAgainJobRunning = false
- }()
- job.LoadPayUser()
- job.LoadAgainSubUser()
- //job.LoadBindPhoneUser()
- }, "bindPhoneAndSubAgain")
- if err != nil {
- panic(err)
- }
- gcron.Start("bindPhoneAndSubAgain")
- }
- // if activityUserCronStr != "" {
- // _, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
- // g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
- // if activityUserJobRunning {
- // return
- // }
- // activityUserJobRunning = true
- // defer func() {
- // activityUserJobRunning = false
- // }()
- // job.LoadPayUser()
- // job.LoadActivityUser()
- // }, "activityUser")
- // if err != nil {
- // panic(err)
- // }
- // gcron.Start("activityUser")
- // }
- if newRegisterCronStr != "" {
- _, err = gcron.Add(ctx, newRegisterCronStr, func(ctx context.Context) {
- g.Log().Infof(ctx, "newister start")
- loadOrder()
- LoadOrderOther() // 未来一天内到期的新注册用户
- }, "newister")
- if err != nil {
- panic(err)
- }
- gcron.Start("newister")
- }
- return job
- }
- func (jm *JobManager) LoadLastRun() {
- err := gconv.Struct(gfile.GetContents("runSign.json"), &jm.lastRun)
- if err != nil {
- g.Log().Errorf(context.TODO(), "加载上次运行配置日常")
- }
- now := time.Now().Format(time.DateTime)
- if jm.lastRun.BindPhone == "" {
- jm.lastRun.BindPhone = now
- }
- if jm.lastRun.AgainSub == "" {
- jm.lastRun.AgainSub = now
- }
- if jm.lastRun.NewActivity == "" {
- jm.lastRun.NewActivity = now
- }
- return
- }
- func (jm *JobManager) SaveLastRun() error {
- return gfile.PutContents("runSign.json", gconv.String(jm.lastRun))
- }
- func (jm *JobManager) LoadPayUser() {
- g.Log().Infof(context.TODO(), "加载付费用户")
- newMap := gmap.New(true)
- for i, v := range public.GetPayUser() {
- newMap.Set(i, v)
- }
- jm.payUser = newMap
- }
- func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interface{}) error {
- var (
- sql = `INSERT INTO freeClubSign `
- val []interface{}
- now = time.Now().Format(time.DateTime)
- operationTime string
- )
- if msg, ok := value.(*public.AgainSubUserMsg); ok {
- if jm.payUser.Contains(msg.MgoUserID) {
- return nil
- }
- operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
- sql += `(mogUserId,register_time,sub_again_date,create_time)VALUES (?,?,?,?)ON DUPLICATE KEY UPDATE sub_again_date=?`
- val = append(val, msg.MgoUserID, jm.GetAndCacheRegisterDate(msg.MgoUserID), operationTime, now, operationTime)
- } else if msg, ok := value.(*public.BindMsg); ok {
- if jm.payUser.Contains(msg.MgoUserID) {
- return nil
- }
- operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
- sql += `(mogUserId,register_time,bind_phone_date,create_time)VALUES (?,?,?,?,?)ON DUPLICATE KEY UPDATE bind_phone_date=?`
- val = append(val, msg.MgoUserID, jm.GetAndCacheRegisterDate(msg.MgoUserID), msg.Phone, operationTime, now, msg.Phone, operationTime)
- } else if msg, ok := value.(*public.NewActiveMsg); ok {
- if jm.payUser.Contains(msg.MgoUserID) {
- return nil
- }
- operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
- registerTime := jm.GetAndCacheRegisterDate(msg.MgoUserID)
- if registerTime > time.Unix(msg.TimeStamp, 0).AddDate(0, 0, -30).Format(time.DateTime) {
- return nil
- }
- sql += `(mogUserId,register_time,act_again_date,create_time)VALUES (?,?,?,?)ON DUPLICATE KEY UPDATE act_again_date=?`
- val = append(val, msg.MgoUserID, registerTime, operationTime, now, operationTime)
- } else {
- return fmt.Errorf("未知类型")
- }
- _, err := g.DB("bi_service").Exec(context.TODO(), sql, val...)
- if err != nil {
- return err
- }
- return nil
- }
- // GetAndCacheRegisterDate 获取缓存用户注册时间
- func (jm *JobManager) GetAndCacheRegisterDate(mgoUserId string) string {
- if registerDate := gconv.String(jm.register.Get(mgoUserId)); registerDate != "" {
- return registerDate
- }
- rMap, _ := db.MG.DB().FindById("user", mgoUserId, `{"l_registedate":1}`)
- if rMap == nil || len(*rMap) == 0 {
- return ""
- }
- date := time.Unix(gconv.Int64((*rMap)["l_registedate"]), 0).Format(time.DateTime)
- jm.register.Set(mgoUserId, date)
- return date
- }
|