package main import ( "context" _ "github.com/gogf/gf/contrib/nosql/redis/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/bson" "time" "workTasks/common" "workTasks/wxSign/wxSignGroup" ) type ( UserSubAction struct { S_m_openid string `bson:"s_m_openid"` LastData int64 `bson:"latest_l_date"` } ) func main() { ctx := gctx.New() _, err := gcron.Add(gctx.New(), g.Cfg().MustGet(ctx, "runCron", "# 0 2 * * *").String(), func(ctx context.Context) { for code, mConfig := range g.Cfg().MustGet(ctx, "groupConfig").Map() { var ( mConfig = gconv.Map(mConfig) appid = gconv.String(mConfig["wxAppId"]) days = gconv.Ints(mConfig["subDaySign"]) ) if appid == "" { g.Log().Errorf(ctx, "获取%s 微信appId异常", code) continue } nwsm, err := wxSignGroup.NewWxSignManager(appid) if err != nil { panic(err) } nwsm.LoadGroupUser(ctx, appid) nwsm.ClearGroupUser(ctx, appid) //同步最新标签 for i, arr := range logNewSign(code, days) { if err := nwsm.AddNewUsers(i, arr...); err != nil { panic(err) } } nwsm.UpdateNewGroupUser(ctx, appid) nwsm.LoadGroupUser(ctx, appid) } }, "userWxSign") if err != nil { panic(err) } select {} } func logNewSign(code string, days []int) map[int][]string { var ( dayMaps = map[int]bool{} signList = map[int][]string{} now = time.Now() signData = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix() maxDay int = 0 ) for _, dayNum := range days { if dayNum > maxDay { maxDay = dayNum } dayMaps[dayNum] = true } g.Dump(dayMaps) subActionList, err := GetUserSubList(code, -maxDay) if err != nil { panic(err) } for _, action := range subActionList { sign := int((signData-action.LastData)/(60*60*24)) + 1 if dayMaps[sign] { signList[sign] = append(signList[sign], action.S_m_openid) } } //加载分组完成,开始更新 g.Dump(signList) return signList } func GetUserSubList(code string, day int) ([]*UserSubAction, error) { //获取30天时间戳 now := time.Now() endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) startTime := endTime.AddDate(0, 0, day) // 30 天前 g.Log().Infof(context.Background(), "开始加载 %d-%d关注数据", startTime.Unix(), endTime.Unix()) client := common.MG.DB().C collection := client.Database("qfw").Collection("jy_subscribe") // 创建聚合管道 var codeLimit interface{} if code != "default" { codeLimit = code } else { codeLimit = bson.M{"$exists": 0} } pipeline := []bson.M{ { // 第一个阶段:筛选符合条件的文档 "$match": bson.M{"l_date": bson.M{"$gt": startTime.Unix(), "$lt": endTime.Unix()}, "s_event": "subscribe", "s_code": codeLimit}, }, { "$group": bson.M{ "_id": "$s_m_openid", // 按 s_m_openid 分组 "latest_l_date": bson.M{"$max": "$l_date"}, // 计算每个 s_m_openid 出现的次数 }, }, { "$project": bson.M{ "_id": 0, "s_m_openid": "$_id", // 显示 s_m_openid "latest_l_date": "$latest_l_date", // 显示出现次数 }, }, { "$sort": bson.M{ "latest_l_date": -1, // 按 count 降序排序 }, }, } // 执行聚合查询 cursor, err := collection.Aggregate(context.Background(), pipeline) if err != nil { g.Log().Fatalf(context.Background(), "Failed to execute aggregation: %v", err) return nil, err } defer cursor.Close(context.Background()) var list []*UserSubAction // 遍历结果并打印 for cursor.Next(context.Background()) { var result UserSubAction if err := cursor.Decode(&result); err != nil { g.Log().Fatalf(context.Background(), "Failed to decode result: %v", err) } list = append(list, &result) } // 检查遍历过程中是否出错 if err := cursor.Err(); err != nil { g.Log().Fatalf(context.Background(), "Cursor error: %v", err) return nil, err } return list, nil }