Browse Source

feat:不活跃变活跃

wangchuanjin 11 months ago
parent
commit
8371296cf6

+ 22 - 21
doFreeClueSign/job/mamager.go

@@ -5,13 +5,14 @@ import (
 	"doFreeClueSign/db"
 	"doFreeClueSign/public"
 	"fmt"
+	"time"
+
 	"github.com/gogf/gf/v2/container/gmap"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gctx"
 	"github.com/gogf/gf/v2/os/gfile"
 	"github.com/gogf/gf/v2/util/gconv"
-	"time"
 )
 
 type JobManager struct {
@@ -64,26 +65,26 @@ func InitJobManager() *JobManager {
 		gcron.Start("bindPhoneAndSubAgain")
 	}
 
-	if activityUserCronStr != "" {
-		_, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
-			g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
-			if activityUserJobRunning {
-				return
-			}
-			activityUserJobRunning = true
-			defer func() {
-				activityUserJobRunning = false
-			}()
-
-			job.LoadPayUser()
-			job.LoadActivityUser()
-
-		}, "activityUser")
-		if err != nil {
-			panic(err)
-		}
-		gcron.Start("activityUser")
-	}
+	// if activityUserCronStr != "" {
+	// 	_, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
+	// 		g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
+	// 		if activityUserJobRunning {
+	// 			return
+	// 		}
+	// 		activityUserJobRunning = true
+	// 		defer func() {
+	// 			activityUserJobRunning = false
+	// 		}()
+
+	// 		job.LoadPayUser()
+	// 		job.LoadActivityUser()
+
+	// 	}, "activityUser")
+	// 	if err != nil {
+	// 		panic(err)
+	// 	}
+	// 	gcron.Start("activityUser")
+	// }
 
 	return job
 }

+ 3 - 1
telemarketingEtl/config.yaml

@@ -138,4 +138,6 @@ handReturn: 5
 #回收站B.3个自然日内有过“已接听”的通话记录且仍处于“商机线索”状态下的客户。
 recycleB: 3
 #3.30个自然日内自动退回公海的“拒绝沟通客户”;;
-refuseHandReturn: 30
+refuseHandReturn: 30
+#访问日志分析结果入库并发数
+visitInfoSavePool: 3

+ 125 - 121
telemarketingEtl/entity/dwd_f_userbase_visit_info.go

@@ -1,9 +1,7 @@
 package entity
 
 import (
-	"app.yhyue.com/moapp/jybase/common"
 	"fmt"
-	"go.mongodb.org/mongo-driver/bson"
 	"log"
 	"regexp"
 	"strings"
@@ -12,32 +10,36 @@ import (
 	"telemarketingEtl/util"
 	"time"
 
+	"app.yhyue.com/moapp/jybase/common"
+	"go.mongodb.org/mongo-driver/bson"
+
 	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gtime"
 	"github.com/gogf/gf/v2/util/gconv"
 )
 
-var USERPOOL = 20
-var locks = make([]*sync.Mutex, USERPOOL)
-
-func lock(userid string) *sync.Mutex {
-	n := 0
-	for _, v := range userid {
-		n += int(v)
-	}
-	return locks[n%USERPOOL]
-}
+var (
+	wxReg        = regexp.MustCompile("MicroMessenger")
+	contentReg   = regexp.MustCompile(".*article/content/(.*)\\.html")
+	portrait1Reg = regexp.MustCompile(".*/swordfish/page_big_pc/unit_portrayal/(.*)")
+	portrait2Reg = regexp.MustCompile(".*swordfish/page_big_pc/(.*)/ent_ser_portrait")
+	searchReg    = regexp.MustCompile(".*jybx/core/(.*)/searchList")
+)
 
-func init() {
-	for i := 0; i < USERPOOL; i++ {
-		locks[i] = &sync.Mutex{}
-	}
+type VisitInfo struct {
+	number        int
+	contentNum    int
+	portraitNum   int
+	searchNum     int
+	platform      int
+	lastLoginTime string
+	uId           string
 }
 
 // 查看事件
 func VisitInfoAdd(start, end int64) {
+	idMapping := LoadIdMapping()
 	s_id := util.GetObjectId(start)
 	e_id := util.GetObjectId(end)
 	query := map[string]interface{}{
@@ -46,117 +48,119 @@ func VisitInfoAdd(start, end int64) {
 			"$lt":  e_id,
 		},
 	}
-	RegWx, _ := regexp.Compile("MicroMessenger")
-
 	log.Println("task run", start, end, gtime.NewFromTimeStamp(start), query)
-	//regWx, _ := regexp.Compile("MicroMessenger")
 	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
-	log.Println("analy tables:", tables)
+	log.Println("需要遍历日志表", tables)
+	session := config.MgoLog.GetMgoConn()
+	defer config.MgoLog.DestoryMongoConn(session)
+	all := map[string]map[string]*VisitInfo{}
+	lastLogin := map[string]string{}
 	for _, table := range tables {
-		/*go*/ func(table string) {
-			session := config.MgoLog.GetMgoConn()
-			iter := session.DB("qfw").C(table).Find(query).Iter()
-			count := 0
-			for thisData := map[string]interface{}{}; iter.Next(&thisData); {
-				// sPool <- true
-				// sWait.Add(1)
-				/*go*/
-				func(thisData map[string]interface{}) {
-					// defer func() {
-					// 	<-sPool
-					// 	sWait.Done()
-					// }()
-					userid, uid := gconv.String(thisData["userid"]), ""
-					if userid == "" {
-						return
-					}
-					if !mongodb.IsObjectIdHex(userid) {
-						userid, uid = GetUserIdByPositionId(userid)
-					} else {
-						idmdata := config.JianyuSubjectdb.SelectBySql(`select uid from dwd_f_userbase_id_mapping where userid = "` + userid + `"`)
-						if idmdata != nil && len(*idmdata) > 0 {
-							uid = gconv.String((*idmdata)[0]["uid"])
-						}
-					}
-					if userid == "" {
-						return
-					}
-					url_ := gconv.String(thisData["url"])
-					reg := regexp.MustCompile(".*article/content/(.*)\\.html")
-					portrait1reg := regexp.MustCompile(".*/swordfish/page_big_pc/unit_portrayal/(.*)")
-					portrait2reg := regexp.MustCompile(".*swordfish/page_big_pc/(.*)/ent_ser_portrait")
-					searchreg := regexp.MustCompile(".*jybx/core/(.*)/searchList")
-					contentnum := 0
-					portraitnum := 0
-					searchnum := 0
-					if reg.MatchString(url_) {
-						contentnum = 1
-					}
-					if portrait1reg.MatchString(url_) || portrait2reg.MatchString(url_) {
-						portraitnum = 1
-					}
-					if searchreg.MatchString(url_) {
-						searchnum = 1
-					}
-					createtime := gconv.Int64(thisData["date"])
-					starttime, endtime := getToday(createtime)
-					craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
-					platform := APP
-					if table == "jy_logs" {
-						platform = PC
-						if client := gconv.String(thisData["client"]); client != "" {
-							if RegWx.MatchString(client) {
-								platform = WX
-							}
-						}
-					}
-					lock(userid).Lock()
-					defer lock(userid).Unlock()
-					if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_userbase_visit_info where userid = ? and createtime>= ? and createtime <?`, userid, starttime, endtime) > 0 {
-						if contentnum != 0 {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,contentnum = contentnum + 1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						} else if portraitnum != 0 {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,portraitnum = contentnum + 1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						} else if searchnum != 0 {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,searchnum = contentnum + 1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						} else {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						}
-					} else {
-						config.JianyuSubjectdb.InsertBySql(`INSERT INTO dwd_f_userbase_visit_info
-														(userid,DATE, number, platform,createtime,contentnum,portraitnum,searchnum)
-														VALUES (?,?,?,?,?,?,?,?)`, userid, craetetimeStr, 1, platform, craetetimeStr, contentnum, portraitnum, searchnum)
-					}
-					if uid != "" {
-						if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_crm_attribute_label where uid = ?`, uid) > 0 {
-							config.JianyuSubjectdb.Update("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uid}, map[string]interface{}{"last_login_time": craetetimeStr})
-						} else {
-							config.JianyuSubjectdb.Insert("dwd_f_crm_attribute_label", map[string]interface{}{
-								"uid":             uid,
-								"last_login_time": craetetimeStr,
-								"updatetime":      time.Now().Format(time.DateTime),
-							})
-						}
+		iter := session.DB("qfw").C(table).Find(query).Sort("_id").Iter()
+		count := 0
+		log.Println("遍历日志表结束。。。", table)
+		for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+			userId := gconv.String(thisData["userid"])
+			if userId == "" || idMapping[userId] == nil {
+				continue
+			}
+			userId = idMapping[userId].UserId
+			createtime := gtime.New(thisData["date"])
+			ymd := createtime.Format(date.Date_Short_Layout)
+			if all[ymd] == nil {
+				all[ymd] = map[string]*VisitInfo{}
+			}
+			vi := all[ymd][userId]
+			if vi == nil {
+				vi = &VisitInfo{
+					uId: idMapping[userId].Uid,
+				}
+			}
+			vi.number++
+			vi.lastLoginTime = createtime.Format(date.Date_Full_Layout)
+			lastLogin[userId] = vi.lastLoginTime
+			url_ := gconv.String(thisData["url"])
+			if contentReg.MatchString(url_) {
+				vi.contentNum++
+			} else if portrait1Reg.MatchString(url_) || portrait2Reg.MatchString(url_) {
+				vi.portraitNum++
+			} else if searchReg.MatchString(url_) {
+				vi.searchNum++
+			}
+			vi.platform = APP
+			if table == "jy_logs" {
+				vi.platform = PC
+				if client := gconv.String(thisData["client"]); client != "" {
+					if wxReg.MatchString(client) {
+						vi.platform = WX
 					}
-				}(thisData)
-				count++
-				if count%5000 == 0 {
-					log.Printf("%s已完成%d条数据\n", table, count)
 				}
-				thisData = map[string]interface{}{}
 			}
-			log.Printf("%s一共完成%d条数据\n", table, count)
-			log.Println("end!")
-			// sWait.Wait()
-		}(table)
+			all[ymd][userId] = vi
+			thisData = map[string]interface{}{}
+			count++
+			if count%50000 == 0 {
+				log.Printf("遍历日志表", table, count)
+			}
+		}
+		log.Println("遍历日志表结束。。。", table, count)
 	}
-}
-
-func getToday(createtime int64) (start, end string) {
-	now := time.Unix(createtime, 0)
-	startOfDay := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
-	endOfDay := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, now.Location())
-	return startOfDay.Format(date.Date_Full_Layout), endOfDay.Format(date.Date_Full_Layout)
+	pool := make(chan bool, g.Cfg().MustGet(ctx, "visitInfoSavePool").Int())
+	wait := &sync.WaitGroup{}
+	newAll := map[string]string{}
+	index := 0
+	log.Println("开始保存统计结果。。。")
+	for k, v := range all {
+		t := gtime.New(k)
+		startTime, endTime := t.Format(date.Date_Full_Layout), t.AddDate(0, 0, 1).Format(date.Date_Full_Layout)
+		for kk, vv := range v {
+			if idMapping[kk] != nil && (newAll[idMapping[kk].Uid] == "" || gtime.New(vv.lastLoginTime).After(gtime.New(newAll[idMapping[kk].Uid]))) {
+				newAll[idMapping[kk].Uid] = vv.lastLoginTime
+			}
+			index++
+			if index%500 == 0 {
+				log.Println("保存统计结果", index)
+			}
+			//
+			pool <- true
+			wait.Add(1)
+			go func(userId string, vi *VisitInfo) {
+				defer func() {
+					<-pool
+					wait.Done()
+				}()
+				if list := config.JianyuSubjectdb.SelectBySql(`select id from dwd_f_userbase_visit_info where userid=? and createtime>=? and createtime<? limit 1`, userId, startTime, endTime); list != nil && len(*list) == 1 {
+					config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number=number+`+fmt.Sprint(vi.number)+`,contentnum=contentnum+`+fmt.Sprint(vi.contentNum)+`,portraitnum=portraitnum+`+fmt.Sprint(vi.portraitNum)+`,searchnum=searchnum+`+fmt.Sprint(vi.searchNum)+`,date=?,platform=? where id=?`, vi.lastLoginTime, vi.platform, (*list)[0]["id"])
+				} else {
+					config.JianyuSubjectdb.InsertBySql(`INSERT INTO dwd_f_userbase_visit_info (userid,date, number, platform,createtime,contentnum,portraitnum,searchnum) VALUES (?,?,?,?,?,?,?,?)`, userId, vi.lastLoginTime, 1, vi.platform, vi.lastLoginTime, vi.contentNum, vi.portraitNum, vi.searchNum)
+				}
+				if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_crm_attribute_label where uid=?`, vi.uId) > 0 {
+					config.JianyuSubjectdb.Update("dwd_f_crm_attribute_label", map[string]interface{}{"uid": vi.uId}, map[string]interface{}{"last_login_time": vi.lastLoginTime})
+				} else {
+					config.JianyuSubjectdb.Insert("dwd_f_crm_attribute_label", map[string]interface{}{
+						"uid":             vi.uId,
+						"last_login_time": lastLogin[userId],
+						"updatetime":      time.Now().Format(time.DateTime),
+					})
+				}
+			}(kk, vv)
+		}
+	}
+	log.Println("保存统计结果结束。。。", index)
+	wait.Wait()
+	//
+	log.Println("开始更新不活跃重新活跃用户。。。")
+	sTime := gtime.New(start)
+	day30, today, nowFormat, tomorrow := sTime.AddDate(0, 0, -30).Format(time.DateTime), sTime.Format(time.DateTime), sTime.Format(time.DateTime), sTime.AddDate(0, 0, 1).Format(time.DateTime)
+	config.JianyuSubjectdb.SelectByBath(1, func(l *[]map[string]interface{}) bool {
+		userid := gconv.String((*l)[0]["userid"])
+		log.Println("更新不活跃重新活跃用户", userid)
+		config.JianyuMaindb.ExecBySql(`INSERT INTO bi_service.freeClubSign (mogUserId,act_again_date,create_time) VALUES (?,?,?) ON DUPLICATE KEY UPDATE act_again_date=?`, userid, nowFormat, nowFormat, nowFormat)
+		return true
+	}, `SELECT userid FROM dwd_f_userbase_visit_info a WHERE a.date>? AND a.date<? AND a.contentnum>=5 and not exists(
+	select 1 from dwd_f_userbase_visit_info b where b.date>? and b.date<? and b.contentnum>=5 and a.userid=b.userid
+	) and not exists (SELECT 1 FROM dwd_f_data_equity_info b WHERE b.endtime>? and a.userid=b.userid)`, today, tomorrow, day30, today, nowFormat)
+	log.Println("更新不活跃重新活跃用户结束。。。")
 }
 
 type UserVisit struct {

+ 28 - 0
telemarketingEtl/entity/entity.go

@@ -1,6 +1,7 @@
 package entity
 
 import (
+	"log"
 	"telemarketingEtl/config"
 
 	"github.com/gogf/gf/v2/util/gconv"
@@ -28,6 +29,33 @@ const (
 	ENTSEARCH       = "企业搜索"
 )
 
+type IdMp struct {
+	Uid    string
+	UserId string
+}
+
+//加载所有的uid
+func LoadIdMapping() map[string]*IdMp {
+	index := 0
+	m := map[string]*IdMp{}
+	log.Println("开始加载dwd_f_userbase_id_mapping。。。", index)
+	config.JianyuSubjectdb.SelectByBath(1, func(l *[]map[string]interface{}) bool {
+		index++
+		if index%5000 == 0 {
+			log.Println("加载dwd_f_userbase_id_mapping", index)
+		}
+		idMp := &IdMp{
+			Uid:    gconv.String((*l)[0]["uid"]),
+			UserId: gconv.String((*l)[0]["userid"]),
+		}
+		m[gconv.String((*l)[0]["position_id"])] = idMp
+		m[gconv.String((*l)[0]["userid"])] = idMp
+		return true
+	}, `select userid,uid,position_id from dwd_f_userbase_id_mapping`)
+	log.Println("加载dwd_f_userbase_id_mapping结束。。。", index)
+	return m
+}
+
 // 根据职位id获取mongodb userid
 func GetUserIdByPositionId(positionId string) (userId, uid string) {
 	if positionId == "" || positionId == "0" {

+ 0 - 161
telemarketingEtl/entity/old.go

@@ -1,161 +0,0 @@
-package entity
-
-import (
-	"log"
-	"regexp"
-	"telemarketingEtl/config"
-	"telemarketingEtl/util"
-	"time"
-
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gtime"
-	"github.com/gogf/gf/v2/util/gconv"
-)
-
-// 查看事件
-func VisitInfoAddOld(start, end int64) {
-	// 获取时间戳 1 和时间戳 2 所在的日期
-	t1 := time.Unix(start, 0)
-	t2 := time.Unix(end, 0)
-
-	index := g.Cfg().MustGet(ctx, "index").Int()
-
-	RegWx, _ := regexp.Compile("MicroMessenger")
-
-	// 获取时间戳 1 和时间戳 2 之间的天数
-	days := int(t2.Sub(t1).Hours() / 24)
-
-	for i := 0; i <= days; i++ {
-		// 获取当前日期
-		t := t1.AddDate(0, 0, i)
-
-		// 获取当天的开始时间
-		startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
-
-		// 获取当天的结束时间
-		endOfDay := time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, t.Location()).Unix()
-
-		s_id := util.GetObjectId(startOfDay)
-		e_id := util.GetObjectId(endOfDay)
-		query := map[string]interface{}{
-			"_id": map[string]interface{}{
-				"$gte": s_id,
-				"$lte": e_id,
-			},
-		}
-		log.Println("task run", time.Unix(startOfDay, 0).Format(date.Date_Full_Layout), time.Unix(endOfDay, 0).Format(date.Date_Full_Layout), gtime.NewFromTimeStamp(start), query)
-		tables := g.Cfg().MustGet(ctx, "logTables").Strings()
-		log.Println("analy tables:", tables)
-		userVisit := map[string]*visitStruct{}
-		for _, table := range tables {
-			func(table string) {
-				session := config.MgoLog.GetMgoConn()
-				iter := session.DB("qfw").C(table).Find(query).Iter()
-				count := 0
-				for thisData := map[string]interface{}{}; iter.Next(&thisData); {
-					func(thisData map[string]interface{}) {
-						userid := gconv.String(thisData["userid"])
-						if userid == "" {
-							return
-						}
-						if !mongodb.IsObjectIdHex(userid) {
-							userid, _ = GetUserIdByPositionId(userid)
-						}
-						if userid == "" {
-							return
-						}
-						url_ := gconv.String(thisData["url"])
-						reg := regexp.MustCompile(".*article/content/(.*)\\.html")
-						contentnum := 0
-						if reg.MatchString(url_) {
-							contentnum = 1
-						}
-
-						createtime := gconv.Int64(thisData["date"])
-
-						craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
-						platform := APP
-						if table == "jy_logs" {
-							platform = PC
-							if client := gconv.String(thisData["client"]); client != "" {
-								if RegWx.MatchString(client) {
-									platform = WX
-								}
-							}
-						}
-						userVisit[userid] = updateUser(userVisit[userid], 1, contentnum, platform, userid, craetetimeStr, craetetimeStr)
-
-						//
-					}(thisData)
-					count++
-					if count%5000 == 0 {
-						log.Printf("%s已完成%d条数据\n", table, count)
-					}
-					thisData = map[string]interface{}{}
-				}
-				log.Println("end!")
-				// sWait.Wait()
-			}(table)
-		}
-		fieids := []string{"userid", "DATE", "number", "platform", "createtime", "contentnum"}
-		args := []interface{}{}
-		ii := 0
-		if len(userVisit) > 0 {
-			log.Println("用户量:", len(userVisit))
-			for _, v := range userVisit {
-				userid := (*v).userid
-				contentnum := (*v).contentnum
-				number := (*v).number
-				createtime := (*v).createtime
-				datetime := (*v).datetime
-				platform := (*v).platform
-				args = append(args, userid, datetime, number, platform, createtime, contentnum)
-				ii++
-				if ii%index == 0 {
-					if len(args) > 0 {
-						config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
-						args = []interface{}{}
-					}
-				}
-			}
-			if len(args) > 0 {
-				config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
-			}
-		}
-
-	}
-}
-
-type visitStruct struct {
-	userid     string
-	createtime string //创建时间
-	datetime   string //访问时间
-	platform   int
-	number     int
-	contentnum int
-}
-
-func updateUser(visit *visitStruct, num, contmun, platform int, userid, createtime, datetime string) *visitStruct {
-	if visit == nil {
-		return &visitStruct{
-			userid:     userid,
-			createtime: createtime,
-			datetime:   datetime,
-			number:     num,
-			contentnum: contmun,
-			platform:   platform,
-		}
-	} else {
-		if num > 0 {
-			visit.number = visit.number + num
-		}
-		if contmun > 0 {
-			visit.contentnum = visit.contentnum + contmun
-		}
-		visit.platform = platform
-		visit.datetime = datetime
-	}
-	return visit
-}