main.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package main
  2. import (
  3. "context"
  4. _ "github.com/gogf/gf/contrib/nosql/redis/v2"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gcron"
  7. "github.com/gogf/gf/v2/os/gctx"
  8. "github.com/gogf/gf/v2/util/gconv"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "time"
  11. "workTasks/common"
  12. "workTasks/wxSign/wxSignGroup"
  13. )
  14. type (
  15. UserSubAction struct {
  16. S_m_openid string `bson:"s_m_openid"`
  17. LastData int64 `bson:"latest_l_date"`
  18. }
  19. )
  20. func main() {
  21. ctx := gctx.New()
  22. _, err := gcron.Add(gctx.New(), g.Cfg().MustGet(ctx, "runCron", "# 0 2 * * *").String(), func(ctx context.Context) {
  23. for code, mConfig := range g.Cfg().MustGet(ctx, "groupConfig").Map() {
  24. var (
  25. mConfig = gconv.Map(mConfig)
  26. appid = gconv.String(mConfig["wxAppId"])
  27. days = gconv.Ints(mConfig["subDaySign"])
  28. )
  29. if appid == "" {
  30. g.Log().Errorf(ctx, "获取%s 微信appId异常", code)
  31. continue
  32. }
  33. nwsm, err := wxSignGroup.NewWxSignManager(appid)
  34. if err != nil {
  35. panic(err)
  36. }
  37. nwsm.LoadGroupUser(ctx, appid)
  38. nwsm.ClearGroupUser(ctx, appid)
  39. //同步最新标签
  40. for i, arr := range logNewSign(code, days) {
  41. if err := nwsm.AddNewUsers(i, arr...); err != nil {
  42. panic(err)
  43. }
  44. }
  45. nwsm.UpdateNewGroupUser(ctx, appid)
  46. nwsm.LoadGroupUser(ctx, appid)
  47. }
  48. }, "userWxSign")
  49. if err != nil {
  50. panic(err)
  51. }
  52. select {}
  53. }
  54. func logNewSign(code string, days []int) map[int][]string {
  55. var (
  56. dayMaps = map[int]bool{}
  57. signList = map[int][]string{}
  58. now = time.Now()
  59. signData = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix()
  60. maxDay int = 0
  61. )
  62. for _, dayNum := range days {
  63. if dayNum > maxDay {
  64. maxDay = dayNum
  65. }
  66. dayMaps[dayNum] = true
  67. }
  68. g.Dump(dayMaps)
  69. subActionList, err := GetUserSubList(code, -maxDay)
  70. if err != nil {
  71. panic(err)
  72. }
  73. for _, action := range subActionList {
  74. sign := int((signData-action.LastData)/(60*60*24)) + 1
  75. if dayMaps[sign] {
  76. signList[sign] = append(signList[sign], action.S_m_openid)
  77. }
  78. }
  79. //加载分组完成,开始更新
  80. g.Dump(signList)
  81. return signList
  82. }
  83. func GetUserSubList(code string, day int) ([]*UserSubAction, error) {
  84. //获取30天时间戳
  85. now := time.Now()
  86. endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  87. startTime := endTime.AddDate(0, 0, day) // 30 天前
  88. g.Log().Infof(context.Background(), "开始加载 %d-%d关注数据", startTime.Unix(), endTime.Unix())
  89. client := common.MG.DB().C
  90. collection := client.Database("qfw").Collection("jy_subscribe")
  91. // 创建聚合管道
  92. var codeLimit interface{}
  93. if code != "default" {
  94. codeLimit = code
  95. } else {
  96. codeLimit = bson.M{"$exists": 0}
  97. }
  98. pipeline := []bson.M{
  99. {
  100. // 第一个阶段:筛选符合条件的文档
  101. "$match": bson.M{"l_date": bson.M{"$gt": startTime.Unix(), "$lt": endTime.Unix()}, "s_event": "subscribe", "s_code": codeLimit},
  102. },
  103. {
  104. "$group": bson.M{
  105. "_id": "$s_m_openid", // 按 s_m_openid 分组
  106. "latest_l_date": bson.M{"$max": "$l_date"}, // 计算每个 s_m_openid 出现的次数
  107. },
  108. },
  109. {
  110. "$project": bson.M{
  111. "_id": 0,
  112. "s_m_openid": "$_id", // 显示 s_m_openid
  113. "latest_l_date": "$latest_l_date", // 显示出现次数
  114. },
  115. },
  116. {
  117. "$sort": bson.M{
  118. "latest_l_date": -1, // 按 count 降序排序
  119. },
  120. },
  121. }
  122. // 执行聚合查询
  123. cursor, err := collection.Aggregate(context.Background(), pipeline)
  124. if err != nil {
  125. g.Log().Fatalf(context.Background(), "Failed to execute aggregation: %v", err)
  126. return nil, err
  127. }
  128. defer cursor.Close(context.Background())
  129. var list []*UserSubAction
  130. // 遍历结果并打印
  131. for cursor.Next(context.Background()) {
  132. var result UserSubAction
  133. if err := cursor.Decode(&result); err != nil {
  134. g.Log().Fatalf(context.Background(), "Failed to decode result: %v", err)
  135. }
  136. list = append(list, &result)
  137. }
  138. // 检查遍历过程中是否出错
  139. if err := cursor.Err(); err != nil {
  140. g.Log().Fatalf(context.Background(), "Cursor error: %v", err)
  141. return nil, err
  142. }
  143. return list, nil
  144. }