wangkaiyue 1 سال پیش
والد
کامیت
69dc7f1dad

+ 3 - 3
doFreeClueSign/config.yaml

@@ -27,9 +27,9 @@ database:
     link: "mysql:root:=PDT49#80Z!RVv52_z@tcp(192.168.3.14:4000)/bi_service"
     debug: true
   subjectdb: #正式环境库名为Jianyu_subjectdb
-    link: "mysql:root:=PDT49#80Z!RVv52_z@tcp(192.168.3.14:4000)/jianyu_subjectdb_test"
+    link: "mysql:readuser:jyTi_R202403@tcp(192.168.3.71:4003)/jianyu_subjectdb_test"
     debug: true
 
 cron:
-  activityUser: "# */30 * * * *" #更新新活跃用户
-  bindPhoneAndSubAgain: "# */5 * * * *" #再次关注&绑定手机号用户
+  #activityUser: "# */30 * * * *" #更新新活跃用户
+  bindPhoneAndSubAgain: "# */1 * * * *" #再次关注&绑定手机号用户

+ 3 - 3
doFreeClueSign/job/job.go

@@ -27,7 +27,7 @@ func (jm *JobManager) LoadActivityUser() {
 		}
 	}
 	//更新
-	jm.lastRun.NewActivity = start.Format(time.DateTime)
+	jm.lastRun.NewActivity = runNow.Format(time.DateTime)
 	if err := jm.SaveLastRun(); err != nil {
 		g.Log().Errorf(ctx, "LoadActivityUser error %v", err)
 	}
@@ -52,7 +52,7 @@ func (jm *JobManager) LoadBindPhoneUser() {
 		}
 	}
 	//更新
-	jm.lastRun.BindPhone = start.Format(time.DateTime)
+	jm.lastRun.BindPhone = runNow.Format(time.DateTime)
 	if err := jm.SaveLastRun(); err != nil {
 		g.Log().Errorf(ctx, "LoadBindPhoneUser error %v", err)
 	}
@@ -77,7 +77,7 @@ func (jm *JobManager) LoadAgainSubUser() {
 		}
 	}
 	//更新
-	jm.lastRun.AgainSub = start.Format(time.DateTime)
+	jm.lastRun.AgainSub = runNow.Format(time.DateTime)
 	if err := jm.SaveLastRun(); err != nil {
 		g.Log().Errorf(ctx, "LoadAgainSubUser error %v", err)
 	}

+ 39 - 13
doFreeClueSign/job/mamager.go

@@ -22,12 +22,17 @@ type JobManager struct {
 	}
 }
 
+var (
+	activityUserJobRunning         bool
+	bindPhoneAndSubAgainJobRunning bool
+)
+
 func InitJobManager() *JobManager {
 	var (
 		err                         error
 		ctx                         = gctx.New()
-		bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "bindPhoneAndSubAgain").String()
-		activityUserCronStr         = g.Cfg().MustGet(ctx, "activityUser").String()
+		bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "cron.bindPhoneAndSubAgain").String()
+		activityUserCronStr         = g.Cfg().MustGet(ctx, "cron.activityUser").String()
 	)
 
 	job := &JobManager{
@@ -37,6 +42,15 @@ func InitJobManager() *JobManager {
 
 	if bindPhoneAndSubAgainCronStr != "" {
 		_, err = gcron.Add(ctx, bindPhoneAndSubAgainCronStr, func(ctx context.Context) {
+			g.Log().Infof(ctx, "bindPhoneAndSubAgain start", bindPhoneAndSubAgainJobRunning)
+			if bindPhoneAndSubAgainJobRunning {
+				return
+			}
+			bindPhoneAndSubAgainJobRunning = true
+			defer func() {
+				bindPhoneAndSubAgainJobRunning = false
+			}()
+
 			job.LoadPayUser()
 			job.LoadAgainSubUser()
 			job.LoadBindPhoneUser()
@@ -49,8 +63,18 @@ func InitJobManager() *JobManager {
 
 	if activityUserCronStr != "" {
 		_, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
+			g.Log().Infof(ctx, "activityUser start", activityUserJobRunning)
+			if activityUserJobRunning {
+				return
+			}
+			activityUserJobRunning = true
+			defer func() {
+				activityUserJobRunning = false
+			}()
+
 			job.LoadPayUser()
 			job.LoadActivityUser()
+
 		}, "activityUser")
 		if err != nil {
 			panic(err)
@@ -84,11 +108,13 @@ func (jm *JobManager) SaveLastRun() error {
 }
 
 func (jm *JobManager) LoadPayUser() {
+	g.Log().Infof(context.TODO(), "加载付费用户")
 	newMap := gmap.New(true)
 	for i, v := range public.GetPayUser() {
 		newMap.Set(i, v)
 	}
 	jm.payUser = newMap
+	g.Dump(newMap)
 }
 
 func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interface{}) error {
@@ -99,25 +125,25 @@ func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interfac
 		operationTime string
 	)
 	if msg, ok := value.(*public.AgainSubUserMsg); ok {
-		if jm.payUser.Contains(msg.MgoUserID) {
-			return nil
-		}
+		//if jm.payUser.Contains(msg.MgoUserID) {
+		//	return nil
+		//}
 		operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
-		sql = `(mogUserId,sub_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE sub_again_date=?`
+		sql += `(mogUserId,sub_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE sub_again_date=?`
 		val = append(val, msg.MgoUserID, operationTime, now, operationTime)
 	} else if msg, ok := value.(*public.BindMsg); ok {
-		if jm.payUser.Contains(msg.MgoUserID) {
-			return nil
-		}
+		//if jm.payUser.Contains(msg.MgoUserID) {
+		//	return nil
+		//}
 		operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
 		sql += `(mogUserId,phone,bind_phone_date,create_time)VALUES (?, ?, ?, ?)ON DUPLICATE KEY UPDATE phone=?, bind_phone_date=?`
 		val = append(val, msg.MgoUserID, msg.Phone, operationTime, now, msg.Phone, operationTime)
 	} else if msg, ok := value.(*public.NewActiveMsg); ok {
-		if jm.payUser.Contains(msg.MgoUserID) {
-			return nil
-		}
+		//if jm.payUser.Contains(msg.MgoUserID) {
+		//	return nil
+		//}
 		operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
-		sql = `(mogUserId,act_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE act_again_date=?`
+		sql += `(mogUserId,act_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE act_again_date=?`
 		val = append(val, msg.MgoUserID, operationTime, now, operationTime)
 	} else {
 		return fmt.Errorf("未知类型")

+ 8 - 0
doFreeClueSign/main.go

@@ -6,6 +6,14 @@ import (
 )
 
 func main() {
+	//_, err := gcron.Add(context.TODO(), "*/2 * * * * *", func(ctx context.Context) {
+	//	fmt.Println("xxx")
+	//}, "111")
+	//if err != nil {
+	//	panic(err)
+	//}
+	//gcron.Start("111")
+
 	job.InitJobManager()
 	select {}
 }

+ 2 - 2
doFreeClueSign/public/getAgainSubUser.go

@@ -28,7 +28,7 @@ func GetAgainSubUser(st, ed time.Time) (rData []*AgainSubUserMsg) {
 			timeStamp  = gconv.Int64(_temp["l_date"])
 		)
 		//是否有取关记录
-		if count := db.MG.DB().Count("jy_subscribe", g.Map{"s_event": "unsubscribe", "$lt": st.Unix()}); count == 0 {
+		if count := db.MG.DB().Count("jy_subscribe", g.Map{"s_event": "unsubscribe", "l_date": g.Map{"$lt": st.Unix()}}); count == 0 {
 			continue
 		}
 		if mgoUserID := getUserIdByOpenid(s_m_openid); mgoUserID != "" {
@@ -40,7 +40,7 @@ func GetAgainSubUser(st, ed time.Time) (rData []*AgainSubUserMsg) {
 		_temp = make(map[string]interface{})
 	}
 
-	return nil
+	return rData
 }
 
 func getUserIdByOpenid(openId string) (userId string) {

+ 5 - 2
doFreeClueSign/public/getBindPhoneUser.go

@@ -23,10 +23,13 @@ func GetBidPhoneUser(st, ed time.Time) (rData []*BindMsg) {
 		Select(g.Map{"body.e_userid": 1, "createtime": 1}).Iter()
 	for _temp := make(map[string]interface{}); it.Next(&_temp); {
 		var (
-			userId    = gconv.String(_temp["body.e_userid"])
-			phone     = getPhoneByUserId(userId)
+			userId    = gconv.String(gconv.Map(_temp["body"])["e_userid"])
 			timeStamp = gconv.Int64(_temp["createtime"])
 		)
+		if userId == "" {
+			continue
+		}
+		phone := getPhoneByUserId(userId)
 		rData = append(rData, &BindMsg{
 			MgoUserID: userId,
 			TimeStamp: timeStamp,

+ 5 - 1
doFreeClueSign/public/getNewActiveUser.go

@@ -1,6 +1,7 @@
 package public
 
 import (
+	"context"
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/util/gconv"
@@ -14,7 +15,10 @@ type NewActiveMsg struct {
 }
 
 func GetNewActiveUser(st, ed time.Time) []*NewActiveMsg {
-	var mgoIds []string
+	var (
+		mgoIds []string
+		ctx    context.Context
+	)
 	stYear, stMonth, stDay := st.Date()
 	edYear, edMonth, edDay := ed.Date()
 	nst := st.AddDate(0, -1, 0)

+ 1 - 2
doFreeClueSign/public/getPayUser.go

@@ -2,7 +2,6 @@ package public
 
 import (
 	"context"
-	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/util/gconv"
 	"time"
@@ -13,7 +12,7 @@ func GetPayUser() map[string]bool {
 		pay = make(map[string]bool)
 		ctx = context.Background()
 	)
-	res, err := g.DB("subjectdb").Query(ctx, fmt.Sprintf(`SELECT id,user_id FROM dwd_f_data_equity_info WHERE endtime>?`, time.Now().Format(time.DateTime)))
+	res, err := g.DB("subjectdb").Query(ctx, `SELECT id,userid FROM dwd_f_data_equity_info WHERE endtime>?`, time.Now().Format(time.DateTime))
 	if err == nil && !res.IsEmpty() {
 		for _, m := range res.List() {
 			pay[gconv.String(m["user_id"])] = true