package entity import ( "log" "regexp" "telemarketingEtl/config" "telemarketingEtl/util" "time" "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/mongodb" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" ) // 查看事件 func VisitInfoAddOld(start, end int64) { // 获取时间戳 1 和时间戳 2 所在的日期 t1 := time.Unix(start, 0) t2 := time.Unix(end, 0) index := g.Cfg().MustGet(ctx, "index").Int() RegWx, _ := regexp.Compile("MicroMessenger") // 获取时间戳 1 和时间戳 2 之间的天数 days := int(t2.Sub(t1).Hours() / 24) for i := 0; i <= days; i++ { // 获取当前日期 t := t1.AddDate(0, 0, i) // 获取当天的开始时间 startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix() // 获取当天的结束时间 endOfDay := time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, t.Location()).Unix() s_id := util.GetObjectId(startOfDay) e_id := util.GetObjectId(endOfDay) query := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": s_id, "$lte": e_id, }, } log.Println("task run", time.Unix(startOfDay, 0).Format(date.Date_Full_Layout), time.Unix(endOfDay, 0).Format(date.Date_Full_Layout), gtime.NewFromTimeStamp(start), query) tables := g.Cfg().MustGet(ctx, "logTables").Strings() log.Println("analy tables:", tables) userVisit := map[string]*visitStruct{} for _, table := range tables { func(table string) { session := config.MgoLog.GetMgoConn() iter := session.DB("qfw").C(table).Find(query).Iter() count := 0 for thisData := map[string]interface{}{}; iter.Next(&thisData); { func(thisData map[string]interface{}) { userid := gconv.String(thisData["userid"]) if userid == "" { return } if !mongodb.IsObjectIdHex(userid) { userid, _ = GetUserIdByPositionId(userid) } if userid == "" { return } url_ := gconv.String(thisData["url"]) reg := regexp.MustCompile(".*article/content/(.*)\\.html") contentnum := 0 if reg.MatchString(url_) { contentnum = 1 } createtime := gconv.Int64(thisData["date"]) craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout) platform := APP if table == "jy_logs" { platform = PC if client := gconv.String(thisData["client"]); client != "" { if RegWx.MatchString(client) { platform = WX } } } userVisit[userid] = updateUser(userVisit[userid], 1, contentnum, platform, userid, craetetimeStr, craetetimeStr) // }(thisData) count++ if count%5000 == 0 { log.Printf("%s已完成%d条数据\n", table, count) } thisData = map[string]interface{}{} } log.Println("end!") // sWait.Wait() }(table) } fieids := []string{"userid", "DATE", "number", "platform", "createtime", "contentnum"} args := []interface{}{} ii := 0 if len(userVisit) > 0 { log.Println("用户量:", len(userVisit)) for _, v := range userVisit { userid := (*v).userid contentnum := (*v).contentnum number := (*v).number createtime := (*v).createtime datetime := (*v).datetime platform := (*v).platform args = append(args, userid, datetime, number, platform, createtime, contentnum) ii++ if ii%index == 0 { if len(args) > 0 { config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args) args = []interface{}{} } } } if len(args) > 0 { config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args) } } } } type visitStruct struct { userid string createtime string //创建时间 datetime string //访问时间 platform int number int contentnum int } func updateUser(visit *visitStruct, num, contmun, platform int, userid, createtime, datetime string) *visitStruct { if visit == nil { return &visitStruct{ userid: userid, createtime: createtime, datetime: datetime, number: num, contentnum: contmun, platform: platform, } } else { if num > 0 { visit.number = visit.number + num } if contmun > 0 { visit.contentnum = visit.contentnum + contmun } visit.platform = platform visit.datetime = datetime } return visit }