package main import ( "context" _ "github.com/gogf/gf/contrib/nosql/redis/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" "github.com/gogf/gf/v2/os/gctx" "go.mongodb.org/mongo-driver/bson" "time" "workTasks/common" "workTasks/wxSign/wxSignGroup" ) type ( UserSubAction struct { S_m_openid string `bson:"s_m_openid"` LastData int64 `bson:"latest_l_date"` } ) func main() { ctx := gctx.New() _, err := gcron.Add(gctx.New(), g.Cfg().MustGet(ctx, "runCron", "# 0 2 * * *").String(), func(ctx context.Context) { //ctx := context.Background() nwsm, err := wxSignGroup.NewWxSignManager() if err != nil { panic(err) } nwsm.LoadGroupUser(ctx) nwsm.ClearGroupUser(ctx) //同步最新标签 for i, arr := range logNewSign() { if err := nwsm.AddNewUsers(i, arr...); err != nil { panic(err) } } nwsm.LoadGroupUser(ctx) }, "userWxSign") if err != nil { panic(err) } select {} } func logNewSign() map[int][]string { var ( dayMaps = map[int]bool{} signList = map[int][]string{} now = time.Now() signData = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix() maxDay int = 0 ) for _, dayNum := range g.Cfg().MustGet(context.Background(), "subDaySign").Ints() { if dayNum > maxDay { maxDay = dayNum } dayMaps[dayNum] = true } g.Dump(dayMaps) subActionList, err := GetUserSubList(-maxDay) if err != nil { panic(err) } for _, action := range subActionList { sign := int((signData-action.LastData)/(60*60*24)) + 1 if dayMaps[sign] { signList[sign] = append(signList[sign], action.S_m_openid) } } //加载分组完成,开始更新 g.Dump(signList) return signList } func GetUserSubList(day int) ([]*UserSubAction, error) { //获取30天时间戳 now := time.Now() endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) startTime := endTime.AddDate(0, 0, day) // 30 天前 g.Log().Infof(context.Background(), "开始加载 %d-%d关注数据", startTime.Unix(), endTime.Unix()) client := common.MG.DB().C collection := client.Database("qfw").Collection("jy_subscribe") // 创建聚合管道 pipeline := []bson.M{ { // 第一个阶段:筛选符合条件的文档 "$match": bson.M{"l_date": bson.M{"$gt": startTime.Unix(), "$lt": endTime.Unix()}, "s_event": "subscribe"}, }, { "$group": bson.M{ "_id": "$s_m_openid", // 按 s_m_openid 分组 "latest_l_date": bson.M{"$max": "$l_date"}, // 计算每个 s_m_openid 出现的次数 }, }, { "$project": bson.M{ "_id": 0, "s_m_openid": "$_id", // 显示 s_m_openid "latest_l_date": "$latest_l_date", // 显示出现次数 }, }, { "$sort": bson.M{ "latest_l_date": -1, // 按 count 降序排序 }, }, } // 执行聚合查询 cursor, err := collection.Aggregate(context.Background(), pipeline) if err != nil { g.Log().Fatalf(context.Background(), "Failed to execute aggregation: %v", err) return nil, err } defer cursor.Close(context.Background()) var list []*UserSubAction // 遍历结果并打印 for cursor.Next(context.Background()) { var result UserSubAction if err := cursor.Decode(&result); err != nil { g.Log().Fatalf(context.Background(), "Failed to decode result: %v", err) } list = append(list, &result) } // 检查遍历过程中是否出错 if err := cursor.Err(); err != nil { g.Log().Fatalf(context.Background(), "Cursor error: %v", err) return nil, err } return list, nil }