package public import ( "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" "sync" "time" ) type NewActiveMsg struct { MgoUserID string TimeStamp int64 //活跃时间 } func GetNewActiveUser(st, ed time.Time) []*NewActiveMsg { var mgoIds []string stYear, stMonth, stDay := st.Date() edYear, edMonth, edDay := ed.Date() nst := st.AddDate(0, -1, 0) _, nstMonth, nstDay := nst.Date() var sql, nSql string if stYear == edYear && stMonth == edMonth && stDay == edDay { sql = fmt.Sprintf("month = %d and day = %d", int(stMonth), stDay) } else if stMonth == edMonth { sql = fmt.Sprintf("month = %d and day >= %d and day <= %d", int(stMonth), stDay, edDay) } else { sql = fmt.Sprintf("(month = %d and day >= %d) or (month = %d and day <= %d)", int(stMonth), stDay, int(edMonth), edDay) } //昨天所有活跃用户 res, err := g.DB().Query(ctx, fmt.Sprintf(`SELECT DISTINCT user_mongoid FROM user_countbyhour WHERE %s `, sql)) if err == nil && !res.IsEmpty() { for _, m := range res.List() { mongoid := gconv.String(m["user_mongoid"]) mgoIds = append(mgoIds, mongoid) } } var ( lock sync.Mutex data []*NewActiveMsg ) if nstMonth == stMonth { nSql = fmt.Sprintf("month = %d and day >= %d and day < %d", int(stMonth), nstDay, stDay) } else { nSql = fmt.Sprintf("((month = %d and day >= %d) or (month = %d and day < %d))", int(nstMonth), nstDay, int(stMonth), stDay) } pool := make(chan bool, 5) wait := &sync.WaitGroup{} for _, id := range mgoIds { pool <- true wait.Add(1) go func(mId string) { defer func() { wait.Done() <-pool }() count, _ := g.DB().GetCount(ctx, fmt.Sprintf("user_mongoid = '%s' and %s", mId, nSql)) if count == 0 { //统计昨天之前30天不活跃用户 lock.Lock() data = append(data, &NewActiveMsg{ MgoUserID: mId, TimeStamp: time.Date(stYear, stMonth, stDay, 0, 0, 0, 0, time.Local).Unix(), }) lock.Unlock() } }(id) } wait.Wait() return data }