job.go 8.6 KB


  1. package job
  2. import (
  3. "context"
  4. "doFreeClueSign/db"
  5. "doFreeClueSign/public"
  6. "github.com/gogf/gf/v2/encoding/gjson"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/util/gconv"
  9. "time"
  10. )
  11. // LoadActivityUser 加载新活跃用户
  12. func (jm *JobManager) LoadActivityUser() {
  13. var (
  14. ctx = context.TODO()
  15. )
  16. start, _ := time.ParseInLocation(time.DateTime, jm.lastRun.NewActivity, time.Local)
  17. val, ed := public.GetNewActiveUser(start)
  18. if len(val) > 0 {
  19. for i, msg := range val {
  20. if i%10 == 0 {
  21. g.Log().Infof(ctx, "JobManager.LoadActivityUser %d/%d", i, len(val))
  22. }
  23. if err := jm.FilterPayUserAndSaveDb(ctx, msg); err != nil {
  24. g.Log().Errorf(ctx, "JobManager.LoadActivityUser.FilterPayUserAndSaveDb %v", gconv.String(msg))
  25. }
  26. }
  27. }
  28. //更新
  29. jm.lastRun.NewActivity = ed.Format(time.DateTime)
  30. if err := jm.SaveLastRun(); err != nil {
  31. g.Log().Errorf(ctx, "LoadActivityUser error %v", err)
  32. }
  33. }
  34. // LoadBindPhoneUser 加载绑定手机号用户
  35. func (jm *JobManager) LoadBindPhoneUser() {
  36. var (
  37. runNow = time.Now()
  38. )
  39. start, _ := time.ParseInLocation(time.DateTime, jm.lastRun.BindPhone, time.Local)
  40. val := public.GetBidPhoneUser(start, runNow)
  41. if len(val) > 0 {
  42. for i, msg := range val {
  43. if i%10 == 0 {
  44. g.Log().Infof(ctx, "JobManager.LoadBindPhoneUser %d/%d", i, len(val))
  45. }
  46. if err := jm.FilterPayUserAndSaveDb(ctx, msg); err != nil {
  47. g.Log().Errorf(ctx, "JobManager.LoadBindPhoneUser.FilterPayUserAndSaveDb %v", gconv.String(msg))
  48. }
  49. }
  50. }
  51. //更新
  52. jm.lastRun.BindPhone = runNow.Format(time.DateTime)
  53. if err := jm.SaveLastRun(); err != nil {
  54. g.Log().Errorf(ctx, "LoadBindPhoneUser error %v", err)
  55. }
  56. }
  57. // LoadAgainSubUser 加载再次关注用户
  58. func (jm *JobManager) LoadAgainSubUser() {
  59. var (
  60. runNow = time.Now()
  61. ctx = context.TODO()
  62. )
  63. start, _ := time.ParseInLocation(time.DateTime, jm.lastRun.AgainSub, time.Local)
  64. val := public.GetAgainSubUser(start, runNow)
  65. if len(val) > 0 {
  66. for i, msg := range val {
  67. if i%10 == 0 {
  68. g.Log().Infof(ctx, "JobManager.LoadAgainSubUser %d/%d", i, len(val))
  69. }
  70. if err := jm.FilterPayUserAndSaveDb(ctx, msg); err != nil {
  71. g.Log().Errorf(ctx, "JobManager.LoadAgainSubUser.FilterPayUserAndSaveDb %v", gconv.String(msg))
  72. }
  73. }
  74. }
  75. //更新
  76. jm.lastRun.AgainSub = runNow.Format(time.DateTime)
  77. if err := jm.SaveLastRun(); err != nil {
  78. g.Log().Errorf(ctx, "LoadAgainSubUser error %v", err)
  79. }
  80. }
  81. // @Author jianghan
  82. // @Description 过滤订单 筛选活动注册用户
  83. // @Date 2024/11/18
  84. func loadOrder() {
  85. sql := `SELECT id, filter, order_code, product_type, user_id, user_phone, vip_starttime, vip_endtime FROM dataexport_order WHERE id > ? AND order_status = 1 ORDER BY id ASC`
  86. res, err := g.DB("jianyu").Query(ctx, sql, LastId)
  87. if err == nil && !res.IsEmpty() {
  88. for _, m := range res.List() {
  89. id := gconv.Int(m["id"])
  90. if id > LastId {
  91. LastId = id
  92. }
  93. jsonObj, err := gjson.DecodeToJson([]byte(gconv.String(m["filter"])))
  94. if err != nil {
  95. g.Log().Errorf(ctx, "loadOrder error %v", err)
  96. continue
  97. }
  98. filter := jsonObj.Map()
  99. userid := gconv.String(m["user_id"])
  100. now := time.Now().Format(time.DateTime)
  101. //vipEndTime := gconv.Time(m["vip_endtime"]).Unix()
  102. if remark := gconv.String(filter["remark"]); remark != "" {
  103. switch remark {
  104. case "邀请好友成功":
  105. g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 1 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 1)`, userid, now, userid)
  106. case "被邀请注册成功":
  107. g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 2 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 2)`, userid, now, userid)
  108. // 被邀请成功注册 送7天超级订阅订单
  109. //res, _ := db.MG.DB().FindById("user", userid, `{"l_vip_endtime":1}`)
  110. //userVEndtime := gconv.Int64((*res)["l_vip_endtime"])
  111. //if userVEndtime == vipEndTime && verifyDate(vipEndTime) {
  112. // g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 3 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 3)`, userid, now, userid)
  113. //}
  114. case "2024年新用户注册赠送7天超级订阅":
  115. //res, _ := db.MG.DB().FindById("user", userid, `{"l_vip_endtime":1}`)
  116. //userVEndtime := gconv.Int64((*res)["l_vip_endtime"])
  117. //if userVEndtime == vipEndTime && verifyDate(vipEndTime) {
  118. // g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 3 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 3)`, userid, now, userid)
  119. //}
  120. }
  121. }
  122. }
  123. }
  124. g.Log().Infof(ctx, "loadOrder end: %d", LastId)
  125. }
  126. func LoadOrderOther() {
  127. sql := `SELECT id, filter, order_code, product_type, user_id, user_phone, vip_starttime, vip_endtime FROM dataexport_order WHERE vip_endtime > ? AND vip_endtime < ? AND order_status = 1 AND (filter LIKE '%邀请好友成功%' OR filter LIKE '%被邀请注册成功%' OR filter LIKE '%2024年新用户注册赠送7天超级订阅%') ORDER BY id ASC`
  128. sql1 := `SELECT count(1) FROM dataexport_order WHERE vip_endtime > ? AND vip_endtime < ? AND order_status = 1 AND (filter LIKE '%邀请好友成功%' OR filter LIKE '%被邀请注册成功%' OR filter LIKE '%2024年新用户注册赠送7天超级订阅%')`
  129. sql2 := `SELECT id, vip_endtime FROM jianyu.dataexport_order WHERE (product_type = 'VIP订阅' OR product_type = '大会员') AND id > ? AND user_id = ? AND order_status = 1`
  130. now := time.Now().Format(time.DateTime)
  131. end := time.Now().AddDate(0, 0, 1).Format(time.DateTime)
  132. c, _ := g.DB("jianyu").GetCount(ctx, sql1, now, end)
  133. g.Log().Info(ctx, "LoadOrderOther count: ", c)
  134. res, err := g.DB("jianyu").Query(ctx, sql, now, end)
  135. if err == nil && !res.IsEmpty() {
  136. for _, m := range res.List() {
  137. id := gconv.Int64(m["id"])
  138. userid := gconv.String(m["user_id"])
  139. res1, err := g.DB("jianyu").Query(ctx, sql2, id, userid)
  140. if err == nil && res1.IsEmpty() {
  141. g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 3 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 3)`, userid, now, userid)
  142. }
  143. }
  144. }
  145. }
  146. // 验证vip到期时间一天内到期
  147. func verifyDate(v int64) bool {
  148. if (v - time.Now().Unix()) < 24*60*60 {
  149. return true
  150. }
  151. return false
  152. }
  153. // @Author jianghan
  154. // @Description 历史数据
  155. // @Date 2024/11/19
  156. func LoadOrderHis() {
  157. g.Log().Infof(ctx, "LoadOrderHis start")
  158. sql := `SELECT id, filter, order_code, product_type, user_id, user_phone, vip_starttime, vip_endtime FROM dataexport_order WHERE id <= ? AND order_status = 1 AND filter LIKE '%邀请好友成功%' ORDER BY id DESC`
  159. res, err := g.DB("jianyu").Query(ctx, sql, LastId)
  160. if err == nil && !res.IsEmpty() {
  161. for _, m := range res.List() {
  162. id := gconv.Int(m["id"])
  163. if id > LastId {
  164. LastId = id
  165. }
  166. userid := gconv.String(m["user_id"])
  167. now := time.Now().Format(time.DateTime)
  168. user, _ := db.MG.DB().FindById("user", userid, `{"i_vip_status":1, "i_member_status": 1}`)
  169. if user != nil && len(*user) > 0 {
  170. if gconv.Int((*user)["i_vip_status"]) <= 0 && gconv.Int((*user)["i_member_status"]) <= 0 {
  171. g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 1 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 1)`, userid, now, userid)
  172. }
  173. }
  174. }
  175. }
  176. g.Log().Info(ctx, "LoadOrderHis end")
  177. }
  178. // @Author jianghan
  179. // @Description 11月9日注册成功
  180. // @Date 2024/11/20
  181. func LoadOrderHisMore() {
  182. g.Log().Info(ctx, "LoadOrderHisMore start")
  183. now := time.Now().Format(time.DateTime)
  184. sql := `SELECT id, filter, order_code, product_type, user_id, user_phone, vip_starttime, vip_endtime FROM dataexport_order WHERE order_status = 1 AND vip_endtime > '2024-11-16 00:00:00' AND vip_endtime < ? AND filter LIKE '%2024年新用户注册赠送7天超级订阅%' ORDER BY id DESC`
  185. sql1 := `SELECT id, vip_endtime FROM jianyu.dataexport_order WHERE (product_type = 'VIP订阅' OR product_type = '大会员') AND id > ? AND user_id = ? AND order_status = 1`
  186. res, err := g.DB("jianyu").Query(ctx, sql, now)
  187. if err == nil && !res.IsEmpty() {
  188. for _, m := range res.List() {
  189. id := gconv.Int64(m["id"])
  190. userid := gconv.String(m["user_id"])
  191. res1, err := g.DB("jianyu").Query(ctx, sql1, id, userid)
  192. if err == nil && res1.IsEmpty() {
  193. g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 3 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 3)`, userid, now, userid)
  194. }
  195. }
  196. }
  197. g.Log().Info(ctx, "LoadOrderHisMore end")
  198. }