|
@@ -0,0 +1,349 @@
|
|
|
+package job
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ . "public"
|
|
|
+ "qfw/util"
|
|
|
+ "qfw/util/elastic"
|
|
|
+ "qfw/util/mongodb"
|
|
|
+ . "report/config"
|
|
|
+ . "report/util"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/donnie4w/go-logger/logger"
|
|
|
+ mgo "gopkg.in/mgo.v2"
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ ProjectQuery = `{"query":{"filtered":{"filter":{"terms":{"list.infoid":[%s]}}}},"_source":["list.subtype","list.toptype"],"from":0,"size":%d}`
|
|
|
+ batchIdLength = 20
|
|
|
+)
|
|
|
+
|
|
|
+//时间都是,没有
|
|
|
+type StatisticJob struct {
|
|
|
+ month_start int64 //今天所在月份开始时间,精确到年月日
|
|
|
+ month_end int64 //今天所在月份结束时间,精确到年月日
|
|
|
+ week_start int64 //今天所在周开始时间,精确到年月日
|
|
|
+ week_end int64 //今天所在周结束时间,精确到年月日
|
|
|
+ today_start int64 //今天开始时间,精确到年月日
|
|
|
+ today_end int64 //今天结束时间,精确到年月日
|
|
|
+ today_end_ymdhms int64 //今天结束时间,精确到年月日时分秒,查询mysql推送历史用到
|
|
|
+}
|
|
|
+
|
|
|
+func (s *StatisticJob) Execute(now time.Time) {
|
|
|
+ defer util.Catch()
|
|
|
+ s.today_start, s.today_end, s.today_end_ymdhms = s.GetToday(now)
|
|
|
+ s.week_start, s.week_end = s.GetWeek(now)
|
|
|
+ s.month_start, s.month_end = s.GetMonth(now)
|
|
|
+ logger.Info("开始统计。。。", util.FormatDate(&now, util.Date_yyyyMMdd))
|
|
|
+ batchIndex := 0
|
|
|
+ startId := ""
|
|
|
+ //
|
|
|
+ pool := make(chan bool, Config.StatisticPoolSize)
|
|
|
+ wait := &sync.WaitGroup{}
|
|
|
+ for {
|
|
|
+ batchIndex++
|
|
|
+ isBreak, users := s.OnceBatch(batchIndex, &startId)
|
|
|
+ for _, temp := range *users {
|
|
|
+ pool <- true
|
|
|
+ wait.Add(1)
|
|
|
+ go func(v map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ util.Catch()
|
|
|
+ <-pool
|
|
|
+ wait.Done()
|
|
|
+ }()
|
|
|
+ if s.Filter(now, util.Int64All(v["l_vip_starttime"])) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ userId := util.BsonIdToSId(v["_id"])
|
|
|
+ logger.Info("开始统计用户", "userId", userId)
|
|
|
+ todayPushCount := Mysql.CountBySql("select count(1) from pushsubscribe where userid=? and date>=? and date<=? and isvip=1", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ //toptype
|
|
|
+ toptype := Mysql.SelectBySql("select b.name,sum(1) as sum from (select if(toptype is null,5,toptype) as toptype from pushsubscribe where userid=? and date>=? and date<=? and isvip=1) as a inner join infotype b on (a.toptype=b.id) group by a.toptype", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ todayToptype := map[string]int{}
|
|
|
+ for _, v := range *toptype {
|
|
|
+ todayToptype[util.ObjToString(v["name"])] = util.IntAll(v["sum"])
|
|
|
+ }
|
|
|
+ //subtype统计
|
|
|
+ subtype := Mysql.SelectBySql("select b.name,sum(1) as sum from (select if(subtype is null,15,subtype) as subtype from pushsubscribe where userid=? and date>=? and date<=? and toptype=1 and isvip=1) as a inner join infotype b on (a.subtype=b.id) group by a.subtype", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ todaySubtype := map[string]int{}
|
|
|
+ for _, v := range *subtype {
|
|
|
+ todaySubtype[util.ObjToString(v["name"])] = util.IntAll(v["sum"])
|
|
|
+ }
|
|
|
+ datas := Mysql.SelectBySql("select infoid from pushsubscribe where userid=? and date>=? and date<=? and toptype=4 and isvip=1", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ if datas != nil {
|
|
|
+ projectSubtypes := s.SearchProject(datas)
|
|
|
+ for _, v := range projectSubtypes {
|
|
|
+ todaySubtype[v] = todaySubtype[v] + 1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //关键词统计
|
|
|
+ matchkeys := Mysql.SelectBySql("select a.matchkeys,sum(1) as sum from (select matchkeys from pushsubscribe where userid=? and date>=? and date<=? and isvip=1 and matchkeys is not null) as a group by a.matchkeys", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ todayMatchkeys := map[string]int{}
|
|
|
+ for _, v := range *matchkeys {
|
|
|
+ todayMatchkeys[util.ObjToString(v["matchkeys"])] = util.IntAll(v["sum"])
|
|
|
+ }
|
|
|
+ //省统计
|
|
|
+ area := Mysql.SelectBySql("select b.name,sum(1) as sum from pushsubscribe as a inner join province b on (a.area=b.id) where a.userid=? and a.date>=? and a.date<=? and isvip=1 group by a.area", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ todayArea := map[string]int{}
|
|
|
+ for _, v := range *area {
|
|
|
+ todayArea[util.ObjToString(v["name"])] = util.IntAll(v["sum"])
|
|
|
+ }
|
|
|
+ //市统计
|
|
|
+ city := Mysql.SelectBySql("select d.name as area,c.name as city,c.sum from (select a.area,b.name,sum(1) as sum from pushsubscribe as a inner join province b on (a.city=b.id) where a.userid=? and a.date>=? and a.date<=? and isvip=1 group by a.city,a.area) as c inner join province d on (c.area=d.id)", userId, s.today_start, s.today_end_ymdhms)
|
|
|
+ todayCity := map[string]map[string]int{}
|
|
|
+ for _, v := range *city {
|
|
|
+ if todayCity[util.ObjToString(v["area"])] == nil {
|
|
|
+ todayCity[util.ObjToString(v["area"])] = map[string]int{}
|
|
|
+ }
|
|
|
+ todayCity[util.ObjToString(v["area"])][util.ObjToString(v["city"])] = util.IntAll(v["sum"])
|
|
|
+ }
|
|
|
+ //天
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ if sess == nil {
|
|
|
+ logger.Error("获取mongodb连接错误")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
+ _, err := sess.DB(Config.Mongodb.DbName).C(Pushspace_statistic).Upsert(map[string]interface{}{
|
|
|
+ "userid": userId,
|
|
|
+ "startdate": s.today_start,
|
|
|
+ "enddate": s.today_end,
|
|
|
+ "type": 1,
|
|
|
+ }, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dateym": s.GetDateym(s.today_end),
|
|
|
+ "createtime": time.Now().Unix(),
|
|
|
+ "pushcount": todayPushCount,
|
|
|
+ "toptype": todayToptype,
|
|
|
+ "subtype_zhaobiao": todaySubtype,
|
|
|
+ "matchkeys": todayMatchkeys,
|
|
|
+ "area": todayArea,
|
|
|
+ "city": todayCity,
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ logger.Error(userId, "更新出错", err)
|
|
|
+ }
|
|
|
+ //周
|
|
|
+ s.Upsert(sess, 2, userId, s.week_start, s.week_end, todayPushCount, todayToptype, todaySubtype, todayMatchkeys, todayArea, todayCity)
|
|
|
+ //月
|
|
|
+ s.Upsert(sess, 3, userId, s.month_start, s.month_end, todayPushCount, todayToptype, todaySubtype, todayMatchkeys, todayArea, todayCity)
|
|
|
+ }(temp)
|
|
|
+ }
|
|
|
+ if isBreak {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ wait.Wait()
|
|
|
+ logger.Info("统计结束。。。")
|
|
|
+}
|
|
|
+
|
|
|
+//过滤用户,非vip时间内,不出报告
|
|
|
+func (s *StatisticJob) Filter(now time.Time, l_vip_starttime int64) bool {
|
|
|
+ t1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ vip_starttime := time.Unix(l_vip_starttime, 0)
|
|
|
+ t2 := time.Date(vip_starttime.Year(), vip_starttime.Month(), vip_starttime.Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ if t1.Before(t2) {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+//分批次加载
|
|
|
+func (s *StatisticJob) OnceBatch(batchIndex int, startId *string) (bool, *[]map[string]interface{}) {
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "i_vip_status": map[string]interface{}{
|
|
|
+ "$in": []int{1, 2},
|
|
|
+ },
|
|
|
+ }
|
|
|
+ if len(Config.TestIds) > 0 {
|
|
|
+ query["_id"] = map[string]interface{}{
|
|
|
+ "$in": ToObjectIds(Config.TestIds),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if *startId != "" {
|
|
|
+ query["_id"] = map[string]interface{}{
|
|
|
+ "$gt": bson.ObjectIdHex(*startId),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("开始加载第", batchIndex, "批统计用户", query)
|
|
|
+ users := []map[string]interface{}{}
|
|
|
+ i := 0
|
|
|
+ sess := mongodb.GetMgoConn()
|
|
|
+ defer mongodb.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(Config.Mongodb.DbName).C(User).Find(query).Select(map[string]interface{}{
|
|
|
+ "_id": 1,
|
|
|
+ "l_vip_starttime": 1,
|
|
|
+ }).Sort("_id").Iter()
|
|
|
+ for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
+ i++
|
|
|
+ *startId = util.BsonIdToSId(temp["_id"])
|
|
|
+ users = append(users, temp)
|
|
|
+ temp = make(map[string]interface{})
|
|
|
+ if i == Config.StatisticBatch {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.Info("第", batchIndex, "批统计用户加载结束", *startId)
|
|
|
+ return i < Config.StatisticBatch, &users
|
|
|
+}
|
|
|
+
|
|
|
+//获取月
|
|
|
+func (s *StatisticJob) GetDateym(unix int64) int {
|
|
|
+ return util.IntAll(util.FormatDateByInt64(&unix, "200601"))
|
|
|
+}
|
|
|
+
|
|
|
+//当前日的开始和结束时间
|
|
|
+func (s *StatisticJob) GetToday(now time.Time) (int64, int64, int64) {
|
|
|
+ start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ end_ymdhms := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, time.Local)
|
|
|
+ return start.Unix(), end.Unix(), end_ymdhms.Unix()
|
|
|
+}
|
|
|
+
|
|
|
+//当前周的开始和结束时间
|
|
|
+func (s *StatisticJob) GetWeek(now time.Time) (int64, int64) {
|
|
|
+ week := WeekNum[now.Weekday().String()]
|
|
|
+ start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ if s := week - 1; s > 0 {
|
|
|
+ start = start.AddDate(0, 0, -s)
|
|
|
+ }
|
|
|
+ end := start.AddDate(0, 0, 6)
|
|
|
+ return start.Unix(), end.Unix()
|
|
|
+}
|
|
|
+
|
|
|
+//当前月的开始和结束时间
|
|
|
+func (s *StatisticJob) GetMonth(now time.Time) (int64, int64) {
|
|
|
+ start := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.Local)
|
|
|
+ end := start.AddDate(0, 1, 0).AddDate(0, 0, -1)
|
|
|
+ return start.Unix(), end.Unix()
|
|
|
+}
|
|
|
+
|
|
|
+//新增或者更新库中月、周的数据
|
|
|
+func (s *StatisticJob) Upsert(session *mgo.Session, tp int, userId string, start, end, todayPushCount int64, todayToptype, todaySubtype, todayMatchkeys, todayArea map[string]int, todayCity map[string]map[string]int) {
|
|
|
+ coll := session.DB(Config.Mongodb.DbName).C(Pushspace_statistic)
|
|
|
+ var monthObj map[string]interface{}
|
|
|
+ coll.Find(map[string]interface{}{
|
|
|
+ "type": tp,
|
|
|
+ "userid": userId,
|
|
|
+ "startdate": start,
|
|
|
+ "enddate": end,
|
|
|
+ }).Select(map[string]interface{}{
|
|
|
+ "_id": 1,
|
|
|
+ "pushcount": 1,
|
|
|
+ "matchkeys": 1,
|
|
|
+ "toptype": 1,
|
|
|
+ "subtype_zhaobiao": 1,
|
|
|
+ "area": 1,
|
|
|
+ "city": 1,
|
|
|
+ }).One(&monthObj)
|
|
|
+ if monthObj == nil {
|
|
|
+ err := coll.Insert(map[string]interface{}{
|
|
|
+ "type": tp,
|
|
|
+ "userid": userId,
|
|
|
+ "startdate": start,
|
|
|
+ "enddate": end,
|
|
|
+ "pushcount": todayPushCount,
|
|
|
+ "toptype": todayToptype,
|
|
|
+ "subtype_zhaobiao": todaySubtype,
|
|
|
+ "matchkeys": todayMatchkeys,
|
|
|
+ "area": todayArea,
|
|
|
+ "city": todayCity,
|
|
|
+ "createtime": time.Now().Unix(),
|
|
|
+ "ispush": 0,
|
|
|
+ "dateym": s.GetDateym(end),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ logger.Error(userId, "新增出错", err)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ toptype, _ := monthObj["toptype"].(map[string]interface{})
|
|
|
+ for k, v := range todayToptype {
|
|
|
+ toptype[k] = util.IntAll(toptype[k]) + v
|
|
|
+ }
|
|
|
+ subtype, _ := monthObj["subtype_zhaobiao"].(map[string]interface{})
|
|
|
+ for k, v := range todaySubtype {
|
|
|
+ subtype[k] = util.IntAll(subtype[k]) + v
|
|
|
+ }
|
|
|
+ matchkeys, _ := monthObj["matchkeys"].(map[string]interface{})
|
|
|
+ for k, v := range todayMatchkeys {
|
|
|
+ matchkeys[k] = util.IntAll(matchkeys[k]) + v
|
|
|
+ }
|
|
|
+ area, _ := monthObj["area"].(map[string]interface{})
|
|
|
+ for k, v := range todayArea {
|
|
|
+ area[k] = util.IntAll(area[k]) + v
|
|
|
+ }
|
|
|
+ city, _ := monthObj["city"].(map[string]interface{})
|
|
|
+ for k, v := range todayCity {
|
|
|
+ vm, _ := city[k].(map[string]interface{})
|
|
|
+ if vm == nil {
|
|
|
+ vm = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ for kk, vv := range v {
|
|
|
+ vm[kk] = util.IntAll(vm[kk]) + vv
|
|
|
+ }
|
|
|
+ city[k] = vm
|
|
|
+ }
|
|
|
+ err := coll.UpdateId(monthObj["_id"], map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "pushcount": util.IntAll(monthObj["pushcount"]) + int(todayPushCount),
|
|
|
+ "toptype": toptype,
|
|
|
+ "subtype_zhaobiao": subtype,
|
|
|
+ "matchkeys": matchkeys,
|
|
|
+ "area": area,
|
|
|
+ "city": city,
|
|
|
+ "updatetime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ logger.Error(userId, "更新出错", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//如果是中标的,去项目表里面找改信息的招标类型
|
|
|
+func (s *StatisticJob) SearchProject(datas *[]map[string]interface{}) []string {
|
|
|
+ result := []string{}
|
|
|
+ batchIds := []string{}
|
|
|
+ for _, data := range *datas {
|
|
|
+ batchIds = append(batchIds, util.ObjToString(data["infoid"]))
|
|
|
+ if len(batchIds) == batchIdLength {
|
|
|
+ result = append(result, s.projectSubtypes(batchIds)...)
|
|
|
+ batchIds = []string{}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(batchIds) > 0 {
|
|
|
+ result = append(result, s.projectSubtypes(batchIds)...)
|
|
|
+ }
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+func (s *StatisticJob) projectSubtypes(ids []string) []string {
|
|
|
+ result := []string{}
|
|
|
+ query := fmt.Sprintf(ProjectQuery, `"`+strings.Join(ids, `","`)+`"`, batchIdLength)
|
|
|
+ list := elastic.Get(Projectset, Projectset, query)
|
|
|
+ if list == nil {
|
|
|
+ return result
|
|
|
+ }
|
|
|
+ for _, v := range *list {
|
|
|
+ array, _ := v["list"].([]interface{})
|
|
|
+ for _, vv := range array {
|
|
|
+ vvMap, _ := vv.(map[string]interface{})
|
|
|
+ toptype, _ := vvMap["toptype"].(string)
|
|
|
+ subtype, _ := vvMap["subtype"].(string)
|
|
|
+ if subtype == "" {
|
|
|
+ subtype = "其它"
|
|
|
+ }
|
|
|
+ if toptype == "招标" && subtype != "" {
|
|
|
+ result = append(result, subtype)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result
|
|
|
+}
|