Browse Source

wip:提交

wangkaiyue 1 year ago
parent
commit
fcd1fecc38

+ 14 - 7
doFreeClueSign/config.yaml

@@ -1,28 +1,35 @@
 mongodb:
-  default:
+  default: #qfw
     address: "192.168.3.149:27180"
     size: 5
     dbName: qfw
     replSet: ""
     userName: ""
     password: ""
-  log:
-    address: "192.168.3.206:27090"
+  log: #日志库
+    address: "192.168.3.149:27190"
     size: 5
     dbName: "qfw"
     replSet: ""
     userName: "admin"
     password: "123456"
 
+logger:
+  level: "all"
+  path: "logs" # 日志文件路径。默认为空,表示关闭,仅输出到终端
+  file: "{Y-m-d}.log" # 日志文件格式。默认为"{Y-m-d}.log"
+
 database:
   default:
     link: "mysql:root:=PDT49#80Z!RVv52_z@tcp(192.168.3.14:4000)/debris_product"
     debug: true
-
-runCron: "# 0 2 * * *" #每天凌晨2点执行
-payTidb: "jianyu_subjectdb_test.dwd_f_data_equity_info"
+  bi_service:
+    link: "mysql:root:=PDT49#80Z!RVv52_z@tcp(192.168.3.14:4000)/bi_service"
+    debug: true
+  subjectdb: #正式环境库名为Jianyu_subjectdb
+    link: "mysql:root:=PDT49#80Z!RVv52_z@tcp(192.168.3.14:4000)/jianyu_subjectdb_test"
+    debug: true
 
 cron:
-  updatePayUser: "# */30 * * * *" #更新付费用户
   activityUser: "# */30 * * * *" #更新新活跃用户
   bindPhoneAndSubAgain: "# */5 * * * *" #再次关注&绑定手机号用户

+ 2 - 0
doFreeClueSign/go.mod

@@ -4,6 +4,7 @@ go 1.20
 
 require (
 	app.yhyue.com/moapp/jybase v0.0.0-20240626030750-115a3c0929fb
+	github.com/gogf/gf/contrib/drivers/mysql/v2 v2.7.2
 	github.com/gogf/gf/v2 v2.7.2
 )
 
@@ -15,6 +16,7 @@ require (
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
 	github.com/go-logr/logr v1.2.4 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
+	github.com/go-sql-driver/mysql v1.7.1 // indirect
 	github.com/go-stack/stack v1.8.0 // indirect
 	github.com/golang/snappy v0.0.4 // indirect
 	github.com/gorilla/websocket v1.5.1 // indirect

+ 4 - 0
doFreeClueSign/go.sum

@@ -69,9 +69,13 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
 github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
+github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
+github.com/gogf/gf/contrib/drivers/mysql/v2 v2.7.2 h1:GpE2JuHVoNJD4lNP1omC1+TKXNCSvXr5oil1bNULYd0=
+github.com/gogf/gf/contrib/drivers/mysql/v2 v2.7.2/go.mod h1:0h3UmNAmA8hnjvTyozZelSWWxiAjGDQttzZqMhkCkJo=
 github.com/gogf/gf/v2 v2.7.2 h1:uZDfyblasI12lZUtFd1mfxsIr8b14cd/F88DJUTCSDM=
 github.com/gogf/gf/v2 v2.7.2/go.mod h1:EBXneAg/wes86rfeh68XC0a2JBNQylmT7Sp6/8Axk88=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=

+ 0 - 22
doFreeClueSign/job.go

@@ -1,22 +0,0 @@
-package main
-
-import "github.com/gogf/gf/v2/container/gvar"
-
-type JobManager struct {
-	payUser map[string]bool //付费账户
-}
-
-func InitJobManager() *JobManager {
-	job := &JobManager{
-		payUser: map[string]bool{},
-	}
-	job.LoadPayUser()
-	return nil
-}
-
-func (jm *JobManager) LoadPayUser() {
-	v := map[string]bool{"xx": false}
-	//m := gmap.New(true)
-	//
-	gvar.New(v, true).Map()
-}

+ 84 - 0
doFreeClueSign/job/job.go

@@ -0,0 +1,84 @@
+package job
+
+import (
+	"context"
+	"doFreeClueSign/public"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"time"
+)
+
+// LoadActivityUser 加载新活跃用户
+func (jm *JobManager) LoadActivityUser() {
+	var (
+		runNow = time.Now()
+		ctx    = context.TODO()
+	)
+	start, _ := time.Parse(time.DateTime, jm.lastRun.NewActivity)
+	val := public.GetNewActiveUser(start, runNow)
+	if len(val) > 0 {
+		for i, msg := range val {
+			if i%10 == 0 {
+				g.Log().Infof(ctx, "JobManager.LoadActivityUser %d/%d", i, len(val))
+			}
+			if err := jm.FilterPayUserAndSaveDb(ctx, msg); err != nil {
+				g.Log().Errorf(ctx, "JobManager.LoadActivityUser.FilterPayUserAndSaveDb %v", gconv.String(msg))
+			}
+		}
+	}
+	//更新
+	jm.lastRun.NewActivity = start.Format(time.DateTime)
+	if err := jm.SaveLastRun(); err != nil {
+		g.Log().Errorf(ctx, "LoadActivityUser error %v", err)
+	}
+}
+
+// LoadBindPhoneUser 加载绑定手机号用户
+func (jm *JobManager) LoadBindPhoneUser() {
+	var (
+		runNow = time.Now()
+		ctx    = context.TODO()
+	)
+	start, _ := time.Parse(time.DateTime, jm.lastRun.BindPhone)
+	val := public.GetBidPhoneUser(start, runNow)
+	if len(val) > 0 {
+		for i, msg := range val {
+			if i%10 == 0 {
+				g.Log().Infof(ctx, "JobManager.LoadBindPhoneUser %d/%d", i, len(val))
+			}
+			if err := jm.FilterPayUserAndSaveDb(ctx, msg); err != nil {
+				g.Log().Errorf(ctx, "JobManager.LoadBindPhoneUser.FilterPayUserAndSaveDb %v", gconv.String(msg))
+			}
+		}
+	}
+	//更新
+	jm.lastRun.BindPhone = start.Format(time.DateTime)
+	if err := jm.SaveLastRun(); err != nil {
+		g.Log().Errorf(ctx, "LoadBindPhoneUser error %v", err)
+	}
+}
+
+// LoadAgainSubUser 加载再次关注用户
+func (jm *JobManager) LoadAgainSubUser() {
+	var (
+		runNow = time.Now()
+		ctx    = context.TODO()
+	)
+	start, _ := time.Parse(time.DateTime, jm.lastRun.AgainSub)
+	val := public.GetAgainSubUser(start, runNow)
+	if len(val) > 0 {
+		for i, msg := range val {
+			if i%10 == 0 {
+				g.Log().Infof(ctx, "JobManager.LoadAgainSubUser %d/%d", i, len(val))
+			}
+			if err := jm.FilterPayUserAndSaveDb(ctx, msg); err != nil {
+				g.Log().Errorf(ctx, "JobManager.LoadAgainSubUser.FilterPayUserAndSaveDb %v", gconv.String(msg))
+			}
+		}
+	}
+	//更新
+	jm.lastRun.AgainSub = start.Format(time.DateTime)
+	if err := jm.SaveLastRun(); err != nil {
+		g.Log().Errorf(ctx, "LoadAgainSubUser error %v", err)
+	}
+}

+ 130 - 0
doFreeClueSign/job/mamager.go

@@ -0,0 +1,130 @@
+package job
+
+import (
+	"context"
+	"doFreeClueSign/public"
+	"fmt"
+	"github.com/gogf/gf/v2/container/gmap"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gcron"
+	"github.com/gogf/gf/v2/os/gctx"
+	"github.com/gogf/gf/v2/os/gfile"
+	"github.com/gogf/gf/v2/util/gconv"
+	"time"
+)
+
+type JobManager struct {
+	payUser *gmap.Map //付费账户
+	lastRun *struct {
+		BindPhone   string `json:"bindPhone"`
+		AgainSub    string `json:"againSub"`
+		NewActivity string `json:"newActivity"`
+	}
+}
+
+func InitJobManager() *JobManager {
+	var (
+		err                         error
+		ctx                         = gctx.New()
+		bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "bindPhoneAndSubAgain").String()
+		activityUserCronStr         = g.Cfg().MustGet(ctx, "activityUser").String()
+	)
+
+	job := &JobManager{
+		payUser: gmap.New(true),
+	}
+	job.LoadLastRun()
+
+	if bindPhoneAndSubAgainCronStr != "" {
+		_, err = gcron.Add(ctx, bindPhoneAndSubAgainCronStr, func(ctx context.Context) {
+			job.LoadPayUser()
+			job.LoadAgainSubUser()
+			job.LoadBindPhoneUser()
+		}, "bindPhoneAndSubAgain")
+		if err != nil {
+			panic(err)
+		}
+		gcron.Start("bindPhoneAndSubAgain")
+	}
+
+	if activityUserCronStr != "" {
+		_, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
+			job.LoadPayUser()
+			job.LoadActivityUser()
+		}, "activityUser")
+		if err != nil {
+			panic(err)
+		}
+		gcron.Start("activityUser")
+	}
+
+	return job
+}
+
+func (jm *JobManager) LoadLastRun() {
+	err := gconv.Struct(gfile.GetContents("runSign.json"), &jm.lastRun)
+	if err != nil {
+		g.Log().Errorf(context.TODO(), "加载上次运行配置日常")
+	}
+	now := time.Now().Format(time.DateTime)
+	if jm.lastRun.BindPhone == "" {
+		jm.lastRun.BindPhone = now
+	}
+	if jm.lastRun.AgainSub == "" {
+		jm.lastRun.AgainSub = now
+	}
+	if jm.lastRun.NewActivity == "" {
+		jm.lastRun.NewActivity = now
+	}
+	return
+}
+
+func (jm *JobManager) SaveLastRun() error {
+	return gfile.PutContents("runSign.json", gconv.String(jm.lastRun))
+}
+
+func (jm *JobManager) LoadPayUser() {
+	newMap := gmap.New(true)
+	for i, v := range public.GetPayUser() {
+		newMap.Set(i, v)
+	}
+	jm.payUser = newMap
+}
+
+func (jm *JobManager) FilterPayUserAndSaveDb(ctx context.Context, value interface{}) error {
+	var (
+		sql           = `INSERT INTO freeClubSign `
+		val           []interface{}
+		now           = time.Now().Format(time.DateTime)
+		operationTime string
+	)
+	if msg, ok := value.(*public.AgainSubUserMsg); ok {
+		if jm.payUser.Contains(msg.MgoUserID) {
+			return nil
+		}
+		operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
+		sql = `(mogUserId,sub_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE sub_again_date=?`
+		val = append(val, msg.MgoUserID, operationTime, now, operationTime)
+	} else if msg, ok := value.(*public.BindMsg); ok {
+		if jm.payUser.Contains(msg.MgoUserID) {
+			return nil
+		}
+		operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
+		sql += `(mogUserId,phone,bind_phone_date,create_time)VALUES (?, ?, ?, ?)ON DUPLICATE KEY UPDATE phone=?, bind_phone_date=?`
+		val = append(val, msg.MgoUserID, msg.Phone, operationTime, now, msg.Phone, operationTime)
+	} else if msg, ok := value.(*public.NewActiveMsg); ok {
+		if jm.payUser.Contains(msg.MgoUserID) {
+			return nil
+		}
+		operationTime = time.Unix(msg.TimeStamp, 0).Format(time.DateTime)
+		sql = `(mogUserId,act_again_date,create_time)VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE act_again_date=?`
+		val = append(val, msg.MgoUserID, operationTime, now, operationTime)
+	} else {
+		return fmt.Errorf("未知类型")
+	}
+	_, err := g.DB("bi_service").Exec(context.TODO(), sql, val...)
+	if err != nil {
+		return err
+	}
+	return nil
+}

+ 2 - 16
doFreeClueSign/main.go

@@ -1,25 +1,11 @@
 package main
 
 import (
-	"context"
+	"doFreeClueSign/job"
 	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gcron"
-	"github.com/gogf/gf/v2/os/gctx"
 )
 
 func main() {
-	var (
-		err     error
-		ctx     = gctx.New()
-		cronStr = g.Cfg().MustGet(ctx, "runCron").String()
-	)
-	_, err = gcron.Add(ctx, cronStr, func(ctx context.Context) {
-		g.Log().Print(ctx, "Every second")
-
-	}, "doFreeClueSign")
-	if err != nil {
-		panic(err)
-	}
+	job.InitJobManager()
 	select {}
 }

+ 42 - 2
doFreeClueSign/public/getAgainSubUser.go

@@ -1,6 +1,12 @@
 package public
 
-import "time"
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"doFreeClueSign/db"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"time"
+)
 
 type AgainSubUserMsg struct {
 	MgoUserID string
@@ -8,6 +14,40 @@ type AgainSubUserMsg struct {
 }
 
 // GetAgainSubUser 再次关注用户列表
-func GetAgainSubUser(st, ed time.Time) []*AgainSubUserMsg {
+func GetAgainSubUser(st, ed time.Time) (rData []*AgainSubUserMsg) {
+	sess := db.MG.DB().GetMgoConn()
+	if sess != nil {
+		defer db.MG.DB().DestoryMongoConn(sess)
+	}
+	it := sess.DB("qfw").C("jy_subscribe").Find(g.Map{"s_event": "subscribe", "l_date": g.Map{"$gte": st.Unix(), "$lt": ed.Unix()}}).
+		Select(g.Map{"s_m_openid": 1, "l_date": 1}).Iter()
+
+	for _temp := make(map[string]interface{}); it.Next(&_temp); {
+		var (
+			s_m_openid = gconv.String(_temp["s_m_openid"])
+			timeStamp  = gconv.Int64(_temp["l_date"])
+		)
+		//是否有取关记录
+		if count := db.MG.DB().Count("jy_subscribe", g.Map{"s_event": "unsubscribe", "$lt": st.Unix()}); count == 0 {
+			continue
+		}
+		if mgoUserID := getUserIdByOpenid(s_m_openid); mgoUserID != "" {
+			rData = append(rData, &AgainSubUserMsg{
+				MgoUserID: mgoUserID,
+				TimeStamp: timeStamp,
+			})
+		}
+		_temp = make(map[string]interface{})
+	}
+
 	return nil
 }
+
+func getUserIdByOpenid(openId string) (userId string) {
+	res, _ := db.MG.DB().FindOne("user", g.Map{"s_m_openid": openId})
+	if res == nil || len(*res) == 0 {
+		return
+	}
+	userId = mongodb.BsonIdToSId((*res)["_id"])
+	return
+}

+ 41 - 3
doFreeClueSign/public/getBindPhoneUser.go

@@ -1,6 +1,11 @@
 package public
 
-import "time"
+import (
+	"doFreeClueSign/db"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"time"
+)
 
 type BindMsg struct {
 	MgoUserID string
@@ -9,6 +14,39 @@ type BindMsg struct {
 }
 
 // GetBidPhoneUser 获取绑定手机号
-func GetBidPhoneUser(st, ed time.Time) []*BindMsg {
-	return nil
+func GetBidPhoneUser(st, ed time.Time) (rData []*BindMsg) {
+	sess := db.MG.DB("log").GetMgoConn()
+	if sess != nil {
+		defer db.MG.DB("log").DestoryMongoConn(sess)
+	}
+	it := sess.DB("qfw").C("nsq_logs").Find(g.Map{"body.e_body.types": "bindPhone", "createtime": g.Map{"$gte": st.Unix(), "$lt": ed.Unix()}}).
+		Select(g.Map{"body.e_userid": 1, "createtime": 1}).Iter()
+	for _temp := make(map[string]interface{}); it.Next(&_temp); {
+		var (
+			userId    = gconv.String(_temp["body.e_userid"])
+			phone     = getPhoneByUserId(userId)
+			timeStamp = gconv.Int64(_temp["createtime"])
+		)
+		rData = append(rData, &BindMsg{
+			MgoUserID: userId,
+			TimeStamp: timeStamp,
+			Phone:     phone,
+		})
+		_temp = make(map[string]interface{})
+	}
+	return
+}
+
+func getPhoneByUserId(userId string) (phone string) {
+	res, _ := db.MG.DB().FindById("user", userId, `{"s_phone":1,"s_m_phone":1}`)
+	if res == nil || len(*res) == 0 {
+		return
+	}
+	if phone = gconv.String(gconv.Map((*res)["s_phone"])); phone != "" {
+		return
+	}
+	if phone = gconv.String(gconv.Map((*res)["s_m_phone"])); phone != "" {
+		return
+	}
+	return
 }

+ 6 - 7
doFreeClueSign/public/getPayUser.go

@@ -5,16 +5,15 @@ import (
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/util/gconv"
-)
-
-var (
-	ctx = context.Background()
+	"time"
 )
 
 func GetPayUser() map[string]bool {
-	payTidb := g.Cfg().MustGet(ctx, "payTidb", "Jianyu_subjectdb.dwd_f_data_equity_info").String()
-	pay := make(map[string]bool)
-	res, err := g.DB().Query(ctx, fmt.Sprintf(`SELECT id,user_id FROM %s`, payTidb))
+	var (
+		pay = make(map[string]bool)
+		ctx = context.Background()
+	)
+	res, err := g.DB("subjectdb").Query(ctx, fmt.Sprintf(`SELECT id,user_id FROM dwd_f_data_equity_info WHERE endtime>?`, time.Now().Format(time.DateTime)))
 	if err == nil && !res.IsEmpty() {
 		for _, m := range res.List() {
 			pay[gconv.String(m["user_id"])] = true