Jelajahi Sumber

Merge branch 'master' into feature/v1.5.13

yuelujie 11 bulan lalu
induk
melakukan
c2ea461493

+ 23 - 22
doFreeClueSign/job/mamager.go

@@ -5,13 +5,14 @@ import (
 	"doFreeClueSign/db"
 	"doFreeClueSign/public"
 	"fmt"
+	"time"
+
 	"github.com/gogf/gf/v2/container/gmap"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gctx"
 	"github.com/gogf/gf/v2/os/gfile"
 	"github.com/gogf/gf/v2/util/gconv"
-	"time"
 )
 
 type JobManager struct {
@@ -34,7 +35,7 @@ func InitJobManager() *JobManager {
 		err                         error
 		ctx                         = gctx.New()
 		bindPhoneAndSubAgainCronStr = g.Cfg().MustGet(ctx, "cron.bindPhoneAndSubAgain").String()
-		activityUserCronStr         = g.Cfg().MustGet(ctx, "cron.activityUser").String()
+		//activityUserCronStr         = g.Cfg().MustGet(ctx, "cron.activityUser").String()
 	)
 
 	job := &JobManager{
@@ -64,26 +65,26 @@ func InitJobManager() *JobManager {
 		gcron.Start("bindPhoneAndSubAgain")
 	}
 
-	if activityUserCronStr != "" {
-		_, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
-			g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
-			if activityUserJobRunning {
-				return
-			}
-			activityUserJobRunning = true
-			defer func() {
-				activityUserJobRunning = false
-			}()
-
-			job.LoadPayUser()
-			job.LoadActivityUser()
-
-		}, "activityUser")
-		if err != nil {
-			panic(err)
-		}
-		gcron.Start("activityUser")
-	}
+	// if activityUserCronStr != "" {
+	// 	_, err = gcron.Add(ctx, activityUserCronStr, func(ctx context.Context) {
+	// 		g.Log().Infof(ctx, "activityUser start %v", activityUserJobRunning)
+	// 		if activityUserJobRunning {
+	// 			return
+	// 		}
+	// 		activityUserJobRunning = true
+	// 		defer func() {
+	// 			activityUserJobRunning = false
+	// 		}()
+
+	// 		job.LoadPayUser()
+	// 		job.LoadActivityUser()
+
+	// 	}, "activityUser")
+	// 	if err != nil {
+	// 		panic(err)
+	// 	}
+	// 	gcron.Start("activityUser")
+	// }
 
 	return job
 }

TEMPAT SAMPAH
telemarketingEtl/.DS_Store


+ 14 - 12
telemarketingEtl/config.yaml

@@ -90,27 +90,27 @@ recycleBin_A: 5  #回收站:A.5个自然日内被销售人员手动退回公
 recycleBin_B: 3  #回收站:B.3个自然日内有过“已接听”的通话记录且仍处于“商机线索”状态下的客户;
 timetaskBl: true
 #定时任务 查看事件
-eventInfoTask: '0 0 * * * *'
+eventInfoTask: '@hourly'
 #定时任务 搜索事件
-searchInfoTask: '0 0 * * * *'
+searchInfoTask: '@hourly'
 #定时任务 每日访问
-visitInfoTask: '0 0 * * * *'
+visitInfoTask: '@hourly'
 #定时任务
-openSeaTask: '0 0 0 * * *'
+openSeaTask: '0 0 * * *'
 #退出公海
-deleteOpenSeaTask: '0 0 0 * * *'
+deleteOpenSeaTask: '0 0 * * *'
 #自动退回公海
-returnOpenSeaTask: '0 0 0 * * *'
+returnOpenSeaTask: '0 0 * * *'
 #定时任务 近30天最多访问
-countMaxVisitTask: '0 0 0 * * *'
+countMaxVisitTask: '0 0 * * *'
 #定时任务 最近7天外呼次数
-countCallsTask: '0 0 * * * *'
+countCallsTask: '@hourly'
 #定时任务 最近连续未接听次数
-missedCallsTask: '0 0 * * * *'
+missedCallsTask: '@hourly'
 #定时任务 近3天招标采购搜索次数/近3天点击标讯三级页次数
-jyListAndDetailTask: '0 0 * * * *'
+jyListAndDetailTask: '@hourly'
 #定时任务 会员是否到期
-vipExpireTask: '0 0 0 * * *'
+vipExpireTask: '0 0 * * *'
 #查看
 eventInfoTime: 5
 #搜索
@@ -138,4 +138,6 @@ handReturn: 5
 #回收站B.3个自然日内有过“已接听”的通话记录且仍处于“商机线索”状态下的客户。
 recycleB: 3
 #3.30个自然日内自动退回公海的“拒绝沟通客户”;;
-refuseHandReturn: 30
+refuseHandReturn: 30
+#访问日志分析结果入库并发数
+visitInfoSavePool: 3

+ 36 - 35
telemarketingEtl/entity/Voiced.go

@@ -3,6 +3,7 @@ package entity
 import (
 	"app.yhyue.com/moapp/jybase/common"
 	"app.yhyue.com/moapp/jybase/date"
+	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"log"
 	"strings"
@@ -27,7 +28,7 @@ var (
 	firstLoad, firstLoad1 = true, true
 	mUser                 = make(map[string]*Called) // 用户
 	userPhone             = make(map[string]string)  // 手机号-用户uid
-	userPhoneMx           sync.Mutex
+	userPhoneMx           sync.RWMutex
 	callPhone             []string                   // 所有被叫手机号
 	missedPhone           = make(map[string]*Voiced) // 所有被叫未接通手机号
 )
@@ -50,8 +51,12 @@ func CountCalled() {
 	log.Println("CountCalled 通话记录加载结束...")
 
 	for _, s := range callPhone {
+		userPhoneMx.RLock()
 		uid := userPhone[s]
-		mUser[uid].frequency += 1
+		userPhoneMx.RUnlock()
+		if uid != "" {
+			mUser[uid].frequency += 1
+		}
 	}
 	log.Println("CountCalled 通话次数匹配结束...")
 	for k, v := range mUser {
@@ -66,6 +71,7 @@ func CountCalled() {
 			}
 		}
 	}
+	mUser = make(map[string]*Called)
 
 }
 
@@ -102,7 +108,7 @@ func getUser() {
 			}
 		}
 		count++
-		if count%2000 == 0 {
+		if count%10000 == 0 {
 			log.Println("CountCalled current-------", count)
 		}
 		uid := common.ObjToString(ret["baseinfo_id"])
@@ -134,8 +140,8 @@ func getRecord() {
 		countSql = "SELECT COUNT(id) FROM voice_record WHERE DATE(createTime) BETWEEN DATE(NOW() - INTERVAL 7 DAY) AND DATE(NOW())"
 	} else {
 		aTime := time.Now().Add(-1 * time.Hour)
-		sql = "SELECT createTime, CallNo, CalledNo FROM voice_record WHERE createTime >= " + aTime.Format(time.DateTime) + " ORDER BY createTime ASC"
-		countSql = "SELECT COUNT(id) FROM voice_record WHERE createTime >= " + aTime.Format(time.DateTime)
+		sql = fmt.Sprintf("SELECT createTime, CallNo, CalledNo FROM voice_record WHERE createTime >= '%s' ORDER BY createTime ASC", aTime.Format(time.DateTime))
+		countSql = fmt.Sprintf("SELECT COUNT(id) FROM voice_record WHERE createTime >= '%s'", aTime.Format(time.DateTime))
 	}
 	total = config.VoicedRecordDb.CountBySql(countSql)
 	log.Println("CountCalled getRecord---", total)
@@ -192,6 +198,7 @@ func MissedCalls() {
 			})
 		}
 	}
+	missedPhone = make(map[string]*Voiced)
 }
 
 func getMissedCalls() {
@@ -201,15 +208,11 @@ func getMissedCalls() {
 		count    int
 		total    int64
 	)
-	if firstLoad1 {
-		sql = "SELECT createTime, CallNo, CalledNo, State FROM voice_record ORDER BY createTime ASC"
-		countSql = "SELECT COUNT(id) FROM voice_record"
-		firstLoad1 = false
-	} else {
-		aTime := time.Now().Add(-1 * time.Hour)
-		sql = "SELECT createTime, CallNo, CalledNo, State FROM voice_record WHERE createTime >= " + aTime.Format(time.DateTime) + " ORDER BY createTime ASC"
-		countSql = "SELECT COUNT(id) FROM voice_record WHERE createTime >= " + aTime.Format(time.DateTime)
-	}
+	//sql = "SELECT createTime, CallNo, CalledNo, State FROM voice_record ORDER BY createTime ASC"
+	//countSql = "SELECT COUNT(id) FROM voice_record"
+	aTime := time.Now().Add(-1 * time.Hour)
+	sql = fmt.Sprintf("SELECT createTime, CallNo, CalledNo, State FROM voice_record WHERE createTime >= '%s' ORDER BY createTime ASC", aTime.Format(time.DateTime))
+	countSql = fmt.Sprintf("SELECT COUNT(id) FROM voice_record WHERE createTime >= '%s'", aTime.Format(time.DateTime))
 	total = config.VoicedRecordDb.CountBySql(countSql)
 	log.Println("MissedCalls getMissedCalls---", total)
 	if total <= 0 {
@@ -246,32 +249,30 @@ func getMissedCalls() {
 		phone := common.ObjToString(ret["CalledNo"])
 		vdate := strings.Split(common.ObjToString(ret["createTime"]), " ")[0]
 		ctype := common.ObjToString(ret["State"])
+		userPhoneMx.RLock()
 		uid := userPhone[phone]
-		if ctype != "dealing" {
-			if missedPhone[uid] != nil {
-				v := missedPhone[uid]
-				v.vDate = vdate
-				v.missed += 1
-				//if VerifyDate(v.vDate, vdate) <= 1 {
-				//	v.vDate = vdate
-				//	v.missed += 1
-				//} else {
-				//	v.missed = 1
-				//	v.vDate = vdate
-				//}
+		userPhoneMx.RUnlock()
+		if uid != "" {
+			if ctype != "dealing" {
+				if missedPhone[uid] != nil {
+					v := missedPhone[uid]
+					v.vDate = vdate
+					v.missed += 1
+				} else {
+					missedPhone[uid] = &Voiced{
+						vDate:  vdate,
+						missed: 1,
+					}
+				}
 			} else {
-				missedPhone[uid] = &Voiced{
-					vDate:  vdate,
-					missed: 1,
+				if missedPhone[uid] != nil {
+					v := missedPhone[uid]
+					v.vDate = vdate
+					v.missed = 0
 				}
 			}
-		} else {
-			if uid != "" && missedPhone[uid] != nil {
-				v := missedPhone[uid]
-				v.vDate = vdate
-				v.missed = 0
-			}
 		}
+
 	}
 	_ = rows.Close()
 }

+ 136 - 130
telemarketingEtl/entity/dwd_f_userbase_visit_info.go

@@ -1,9 +1,7 @@
 package entity
 
 import (
-	"app.yhyue.com/moapp/jybase/common"
 	"fmt"
-	"go.mongodb.org/mongo-driver/bson"
 	"log"
 	"regexp"
 	"strings"
@@ -12,32 +10,36 @@ import (
 	"telemarketingEtl/util"
 	"time"
 
+	"app.yhyue.com/moapp/jybase/common"
+	"go.mongodb.org/mongo-driver/bson"
+
 	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gtime"
 	"github.com/gogf/gf/v2/util/gconv"
 )
 
-var USERPOOL = 20
-var locks = make([]*sync.Mutex, USERPOOL)
-
-func lock(userid string) *sync.Mutex {
-	n := 0
-	for _, v := range userid {
-		n += int(v)
-	}
-	return locks[n%USERPOOL]
-}
+var (
+	wxReg        = regexp.MustCompile("MicroMessenger")
+	contentReg   = regexp.MustCompile(".*article/content/(.*)\\.html")
+	portrait1Reg = regexp.MustCompile(".*/swordfish/page_big_pc/unit_portrayal/(.*)")
+	portrait2Reg = regexp.MustCompile(".*swordfish/page_big_pc/(.*)/ent_ser_portrait")
+	searchReg    = regexp.MustCompile(".*jybx/core/(.*)/searchList")
+)
 
-func init() {
-	for i := 0; i < USERPOOL; i++ {
-		locks[i] = &sync.Mutex{}
-	}
+type VisitInfo struct {
+	number        int
+	contentNum    int
+	portraitNum   int
+	searchNum     int
+	platform      int
+	lastLoginTime string
+	uId           string
 }
 
 // 查看事件
 func VisitInfoAdd(start, end int64) {
+	idMapping := LoadIdMapping()
 	s_id := util.GetObjectId(start)
 	e_id := util.GetObjectId(end)
 	query := map[string]interface{}{
@@ -46,117 +48,118 @@ func VisitInfoAdd(start, end int64) {
 			"$lt":  e_id,
 		},
 	}
-	RegWx, _ := regexp.Compile("MicroMessenger")
-
 	log.Println("task run", start, end, gtime.NewFromTimeStamp(start), query)
-	//regWx, _ := regexp.Compile("MicroMessenger")
 	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
-	log.Println("analy tables:", tables)
+	log.Println("需要遍历日志表", tables)
+	session := config.MgoLog.GetMgoConn()
+	defer config.MgoLog.DestoryMongoConn(session)
+	all := map[string]map[string]*VisitInfo{}
+	lastLogin := map[string]string{}
 	for _, table := range tables {
-		/*go*/ func(table string) {
-			session := config.MgoLog.GetMgoConn()
-			iter := session.DB("qfw").C(table).Find(query).Iter()
-			count := 0
-			for thisData := map[string]interface{}{}; iter.Next(&thisData); {
-				// sPool <- true
-				// sWait.Add(1)
-				/*go*/
-				func(thisData map[string]interface{}) {
-					// defer func() {
-					// 	<-sPool
-					// 	sWait.Done()
-					// }()
-					userid, uid := gconv.String(thisData["userid"]), ""
-					if userid == "" {
-						return
-					}
-					if !mongodb.IsObjectIdHex(userid) {
-						userid, uid = GetUserIdByPositionId(userid)
-					} else {
-						idmdata := config.JianyuSubjectdb.SelectBySql(`select uid from dwd_f_userbase_id_mapping where userid = "` + userid + `"`)
-						if idmdata != nil && len(*idmdata) > 0 {
-							uid = gconv.String((*idmdata)[0]["uid"])
-						}
-					}
-					if userid == "" {
-						return
-					}
-					url_ := gconv.String(thisData["url"])
-					reg := regexp.MustCompile(".*article/content/(.*)\\.html")
-					portrait1reg := regexp.MustCompile(".*/swordfish/page_big_pc/unit_portrayal/(.*)")
-					portrait2reg := regexp.MustCompile(".*swordfish/page_big_pc/(.*)/ent_ser_portrait")
-					searchreg := regexp.MustCompile(".*jybx/core/(.*)/searchList")
-					contentnum := 0
-					portraitnum := 0
-					searchnum := 0
-					if reg.MatchString(url_) {
-						contentnum = 1
-					}
-					if portrait1reg.MatchString(url_) || portrait2reg.MatchString(url_) {
-						portraitnum = 1
-					}
-					if searchreg.MatchString(url_) {
-						searchnum = 1
-					}
-					createtime := gconv.Int64(thisData["date"])
-					starttime, endtime := getToday(createtime)
-					craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
-					platform := APP
-					if table == "jy_logs" {
-						platform = PC
-						if client := gconv.String(thisData["client"]); client != "" {
-							if RegWx.MatchString(client) {
-								platform = WX
-							}
-						}
-					}
-					lock(userid).Lock()
-					defer lock(userid).Unlock()
-					if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_userbase_visit_info where userid = ? and createtime>= ? and createtime <?`, userid, starttime, endtime) > 0 {
-						if contentnum != 0 {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,contentnum = contentnum + 1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						} else if portraitnum != 0 {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,portraitnum = contentnum + 1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						} else if searchnum != 0 {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,searchnum = contentnum + 1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						} else {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,date =?,platform=? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, platform, userid, starttime, endtime)
-						}
-					} else {
-						config.JianyuSubjectdb.InsertBySql(`INSERT INTO dwd_f_userbase_visit_info
-														(userid,DATE, number, platform,createtime,contentnum,portraitnum,searchnum)
-														VALUES (?,?,?,?,?,?,?,?)`, userid, craetetimeStr, 1, platform, craetetimeStr, contentnum, portraitnum, searchnum)
-					}
-					if uid != "" {
-						if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_crm_attribute_label where uid = ?`, uid) > 0 {
-							config.JianyuSubjectdb.Update("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uid}, map[string]interface{}{"last_login_time": craetetimeStr})
-						} else {
-							config.JianyuSubjectdb.Insert("dwd_f_crm_attribute_label", map[string]interface{}{
-								"uid":             uid,
-								"last_login_time": craetetimeStr,
-								"updatetime":      time.Now().Format(time.DateTime),
-							})
-						}
+		iter := session.DB("qfw").C(table).Find(query).Select(map[string]interface{}{
+			"userid": 1,
+			"date":   1,
+			"url":    1,
+		}).Sort("_id").Iter()
+		count := 0
+		log.Println("开始遍历日志表。。。", table)
+		for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+			userId := gconv.String(thisData["userid"])
+			if userId == "" || idMapping[userId] == nil {
+				continue
+			}
+			userId = idMapping[userId].UserId
+			createtime := gtime.New(thisData["date"])
+			ymd := createtime.Format("Y-m-d")
+			if all[ymd] == nil {
+				all[ymd] = map[string]*VisitInfo{}
+			}
+			vi := all[ymd][userId]
+			if vi == nil {
+				vi = &VisitInfo{
+					uId: idMapping[userId].Uid,
+				}
+			}
+			vi.number++
+			vi.lastLoginTime = createtime.String()
+			lastLogin[userId] = vi.lastLoginTime
+			url_ := gconv.String(thisData["url"])
+			if contentReg.MatchString(url_) {
+				vi.contentNum++
+			} else if portrait1Reg.MatchString(url_) || portrait2Reg.MatchString(url_) {
+				vi.portraitNum++
+			} else if searchReg.MatchString(url_) {
+				vi.searchNum++
+			}
+			vi.platform = APP
+			if table == "jy_logs" {
+				vi.platform = PC
+				if client := gconv.String(thisData["client"]); client != "" {
+					if wxReg.MatchString(client) {
+						vi.platform = WX
 					}
-				}(thisData)
-				count++
-				if count%5000 == 0 {
-					log.Printf("%s已完成%d条数据\n", table, count)
 				}
-				thisData = map[string]interface{}{}
 			}
-			log.Printf("%s一共完成%d条数据\n", table, count)
-			log.Println("end!")
-			// sWait.Wait()
-		}(table)
+			all[ymd][userId] = vi
+			thisData = map[string]interface{}{}
+			count++
+			if count%50000 == 0 {
+				log.Printf("遍历日志表", table, count)
+			}
+		}
+		log.Println("遍历日志表结束。。。", table, count)
 	}
-}
-
-func getToday(createtime int64) (start, end string) {
-	now := time.Unix(createtime, 0)
-	startOfDay := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
-	endOfDay := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, now.Location())
-	return startOfDay.Format(date.Date_Full_Layout), endOfDay.Format(date.Date_Full_Layout)
+	pool := make(chan bool, g.Cfg().MustGet(ctx, "visitInfoSavePool").Int())
+	wait := &sync.WaitGroup{}
+	index := 0
+	log.Println("开始保存统计结果。。。")
+	for k, v := range all {
+		t := gtime.New(k)
+		startTime, endTime := t.String(), t.AddDate(0, 0, 1).String()
+		for kk, vv := range v {
+			index++
+			if index%500 == 0 {
+				log.Println("保存统计结果", index)
+			}
+			//
+			pool <- true
+			wait.Add(1)
+			go func(userId string, vi *VisitInfo) {
+				defer func() {
+					<-pool
+					wait.Done()
+				}()
+				if list := config.JianyuSubjectdb.SelectBySql(`select id from dwd_f_userbase_visit_info where userid=? and createtime>=? and createtime<? limit 1`, userId, startTime, endTime); list != nil && len(*list) == 1 {
+					config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number=number+`+fmt.Sprint(vi.number)+`,contentnum=contentnum+`+fmt.Sprint(vi.contentNum)+`,portraitnum=portraitnum+`+fmt.Sprint(vi.portraitNum)+`,searchnum=searchnum+`+fmt.Sprint(vi.searchNum)+`,date=?,platform=? where id=?`, vi.lastLoginTime, vi.platform, (*list)[0]["id"])
+				} else {
+					config.JianyuSubjectdb.InsertBySql(`INSERT INTO dwd_f_userbase_visit_info (userid,date, number, platform,createtime,contentnum,portraitnum,searchnum) VALUES (?,?,?,?,?,?,?,?)`, userId, vi.lastLoginTime, 1, vi.platform, vi.lastLoginTime, vi.contentNum, vi.portraitNum, vi.searchNum)
+				}
+				if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_crm_attribute_label where uid=?`, vi.uId) > 0 {
+					config.JianyuSubjectdb.ExecBySql("update dwd_f_crm_attribute_label set last_login_time=? where uid=?", vi.uId, lastLogin[userId])
+				} else {
+					config.JianyuSubjectdb.InsertBatch("dwd_f_crm_attribute_label", []string{"uid", "last_login_time", "updatetime"}, []interface{}{vi.uId, lastLogin[userId], gtime.Now().String()})
+				}
+			}(kk, vv)
+		}
+	}
+	log.Println("保存统计结果结束。。。", index)
+	wait.Wait()
+	//
+	log.Println("开始更新不活跃重新活跃用户。。。")
+	sTime := gtime.Now().StartOfDay()
+	day30, today, nowFormat, tomorrow := sTime.AddDate(0, 0, -30).String(), sTime.String(), gtime.Now().String(), sTime.AddDate(0, 0, 1).String()
+	config.JianyuSubjectdb.SelectByBath(1, func(l *[]map[string]interface{}) bool {
+		userid := gconv.String((*l)[0]["userid"])
+		datas := config.JianyuMaindb.SelectBySql(`select act_again_date from bi_service.freeClubSign where mogUserId=? limit 1`, userid)
+		if datas == nil || len(*datas) == 0 || gtime.New((*datas)[0]["act_again_date"]).Before(sTime) {
+			log.Println("更新不活跃重新活跃用户", userid)
+			config.JianyuMaindb.ExecBySql(`INSERT INTO bi_service.freeClubSign (mogUserId,act_again_date,create_time) VALUES (?,?,?) ON DUPLICATE KEY UPDATE act_again_date=?`, userid, nowFormat, nowFormat, nowFormat)
+		}
+		return true
+	}, `SELECT userid FROM dwd_f_userbase_visit_info a WHERE a.date>? AND a.date<? AND a.contentnum>=5 and not exists(
+	select 1 from dwd_f_userbase_visit_info b where b.date>? and b.date<? and b.contentnum>=5 and a.userid=b.userid
+	) and not exists (SELECT 1 FROM dwd_f_data_equity_info b WHERE b.endtime>? and a.userid=b.userid)`, today, tomorrow, day30, today, nowFormat)
+	log.Println("更新不活跃重新活跃用户结束。。。")
 }
 
 type UserVisit struct {
@@ -256,6 +259,7 @@ func CountMaxVisit() {
 			})
 		}
 	}
+	mData = make(map[string]*UserVisit)
 }
 
 // @Author jianghan
@@ -311,8 +315,8 @@ func Count3DaysSearch() {
 		countSql = "SELECT COUNT(v.id) FROM dwd_f_userbase_search_info v WHERE DATE(v.search_time) BETWEEN DATE(NOW() - INTERVAL 3 DAY) AND DATE(NOW())"
 	} else {
 		aTime := time.Now().Add(-1 * time.Hour)
-		sql = "SELECT v.userid, b.uid, v.search_time FROM dwd_f_userbase_search_info v LEFT JOIN dwd_f_userbase_baseinfo b ON v.userid = b.userid WHERE v.search_time >= " + aTime.Format(time.DateTime) + " ORDER BY v.date ASC"
-		countSql = "SELECT COUNT(v.id) FROM dwd_f_userbase_search_info v WHERE v.search_time >= " + aTime.Format(time.DateTime)
+		sql = fmt.Sprintf("SELECT v.userid, b.uid, v.search_time FROM dwd_f_userbase_search_info v LEFT JOIN dwd_f_userbase_baseinfo b ON v.userid = b.userid WHERE v.search_time >= '%s' ORDER BY v.search_time ASC", aTime.Format(time.DateTime))
+		countSql = fmt.Sprintf("SELECT COUNT(v.id) FROM dwd_f_userbase_search_info v WHERE v.search_time >= '%s'", aTime.Format(time.DateTime))
 	}
 	total = config.JianyuSubjectdb.CountBySql(countSql)
 	log.Println("Count3DaysSearch total---", total)
@@ -364,6 +368,7 @@ func Count3DaysSearch() {
 			})
 		}
 	}
+	mSearchMap = make(map[string]int)
 }
 
 func Count3DaysDetail() {
@@ -391,8 +396,8 @@ func Count3DaysDetail() {
 		countSql = "SELECT COUNT(v.id) FROM dwd_f_userbase_visit_info v WHERE DATE(date) BETWEEN DATE(NOW() - INTERVAL 3 DAY) AND DATE(NOW())"
 	} else {
 		aTime := time.Now().Add(-1 * time.Hour)
-		sql = "SELECT v.userid, b.uid, v.date, v.contentnum FROM dwd_f_userbase_visit_info v LEFT JOIN dwd_f_userbase_baseinfo b ON v.userid = b.userid WHERE v.date >= " + aTime.Format(time.DateTime) + " ORDER BY v.date ASC"
-		countSql = "SELECT COUNT(v.id) FROM dwd_f_userbase_visit_info v WHERE v.date >= " + aTime.Format(time.DateTime)
+		sql = fmt.Sprintf("SELECT v.userid, b.uid, v.date, v.contentnum FROM dwd_f_userbase_visit_info v LEFT JOIN dwd_f_userbase_baseinfo b ON v.userid = b.userid WHERE v.date >= '%s' ORDER BY v.date ASC", aTime.Format(time.DateTime))
+		countSql = fmt.Sprintf("SELECT COUNT(v.id) FROM dwd_f_userbase_visit_info v WHERE v.date >= '%s'", aTime.Format(time.DateTime))
 	}
 	total = config.JianyuSubjectdb.CountBySql(countSql)
 	log.Println("Count3DaysDetail total---", total)
@@ -413,7 +418,7 @@ func Count3DaysDetail() {
 		}
 		err = rows.Scan(scanArgs...)
 		if err != nil {
-			log.Println("CountMaxVisit---", err)
+			log.Println("Count3DaysDetail---", err)
 			break
 		}
 		for i, col := range values {
@@ -424,7 +429,7 @@ func Count3DaysDetail() {
 			}
 		}
 		count++
-		if count%2000 == 0 {
+		if count%5000 == 0 {
 			log.Println("Count3DaysDetail current-------", count)
 		}
 		uid := common.ObjToString(ret["uid"])
@@ -445,6 +450,7 @@ func Count3DaysDetail() {
 			})
 		}
 	}
+	mDetailMap = make(map[string]int)
 }
 
 func VipExpire() {
@@ -460,8 +466,8 @@ func VipExpire() {
 	config.JianyuSubjectdb.ExecBySql("update dwd_f_crm_attribute_label set supersub = 0 where supersub = 1")
 	config.JianyuSubjectdb.ExecBySql("update dwd_f_crm_attribute_label set areasubpkg = 0 where areasubpkg = 1")
 
-	sql = fmt.Sprintf("select uid, product_type from dwd_f_data_equity_info WHERE product_type = '省份订阅包' or product_type = '超级订阅' AND endtime > %s", now.Format(date.Date_Short_Layout))
-	countSql = fmt.Sprintf("select count(id) from dwd_f_data_equity_info WHERE product_type = '省份订阅包' or product_type = '超级订阅' AND endtime > %s", now.Format(date.Date_Short_Layout))
+	sql = fmt.Sprintf("select uid, product_type from dwd_f_data_equity_info WHERE product_type = '省份订阅包' or product_type = '超级订阅' AND endtime > '%s'", now.Format(date.Date_Short_Layout))
+	countSql = fmt.Sprintf("select count(id) from dwd_f_data_equity_info WHERE product_type = '省份订阅包' or product_type = '超级订阅' AND endtime > '%s'", now.Format(date.Date_Short_Layout))
 
 	total = config.JianyuSubjectdb.CountBySql(countSql)
 	log.Println("VipExpire total---", total)
@@ -493,7 +499,7 @@ func VipExpire() {
 			}
 		}
 		count++
-		if count%2000 == 0 {
+		if count%5000 == 0 {
 			log.Println("VipExpire current-------", count)
 		}
 

+ 28 - 0
telemarketingEtl/entity/entity.go

@@ -1,6 +1,7 @@
 package entity
 
 import (
+	"log"
 	"telemarketingEtl/config"
 
 	"github.com/gogf/gf/v2/util/gconv"
@@ -28,6 +29,33 @@ const (
 	ENTSEARCH       = "企业搜索"
 )
 
+type IdMp struct {
+	Uid    string
+	UserId string
+}
+
+//加载所有的uid
+func LoadIdMapping() map[string]*IdMp {
+	log.Println("开始加载dwd_f_userbase_id_mapping。。。")
+	m := map[string]*IdMp{}
+	index := 0
+	config.JianyuSubjectdb.SelectByBath(1, func(l *[]map[string]interface{}) bool {
+		index++
+		if index%5000 == 0 {
+			log.Println("加载dwd_f_userbase_id_mapping", index)
+		}
+		idMp := &IdMp{
+			Uid:    gconv.String((*l)[0]["uid"]),
+			UserId: gconv.String((*l)[0]["userid"]),
+		}
+		m[gconv.String((*l)[0]["position_id"])] = idMp
+		m[gconv.String((*l)[0]["userid"])] = idMp
+		return true
+	}, `select userid,uid,position_id from dwd_f_userbase_id_mapping`)
+	log.Println("加载dwd_f_userbase_id_mapping结束。。。", index)
+	return m
+}
+
 // 根据职位id获取mongodb userid
 func GetUserIdByPositionId(positionId string) (userId, uid string) {
 	if positionId == "" || positionId == "0" {

+ 0 - 161
telemarketingEtl/entity/old.go

@@ -1,161 +0,0 @@
-package entity
-
-import (
-	"log"
-	"regexp"
-	"telemarketingEtl/config"
-	"telemarketingEtl/util"
-	"time"
-
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gtime"
-	"github.com/gogf/gf/v2/util/gconv"
-)
-
-// 查看事件
-func VisitInfoAddOld(start, end int64) {
-	// 获取时间戳 1 和时间戳 2 所在的日期
-	t1 := time.Unix(start, 0)
-	t2 := time.Unix(end, 0)
-
-	index := g.Cfg().MustGet(ctx, "index").Int()
-
-	RegWx, _ := regexp.Compile("MicroMessenger")
-
-	// 获取时间戳 1 和时间戳 2 之间的天数
-	days := int(t2.Sub(t1).Hours() / 24)
-
-	for i := 0; i <= days; i++ {
-		// 获取当前日期
-		t := t1.AddDate(0, 0, i)
-
-		// 获取当天的开始时间
-		startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
-
-		// 获取当天的结束时间
-		endOfDay := time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, t.Location()).Unix()
-
-		s_id := util.GetObjectId(startOfDay)
-		e_id := util.GetObjectId(endOfDay)
-		query := map[string]interface{}{
-			"_id": map[string]interface{}{
-				"$gte": s_id,
-				"$lte": e_id,
-			},
-		}
-		log.Println("task run", time.Unix(startOfDay, 0).Format(date.Date_Full_Layout), time.Unix(endOfDay, 0).Format(date.Date_Full_Layout), gtime.NewFromTimeStamp(start), query)
-		tables := g.Cfg().MustGet(ctx, "logTables").Strings()
-		log.Println("analy tables:", tables)
-		userVisit := map[string]*visitStruct{}
-		for _, table := range tables {
-			func(table string) {
-				session := config.MgoLog.GetMgoConn()
-				iter := session.DB("qfw").C(table).Find(query).Iter()
-				count := 0
-				for thisData := map[string]interface{}{}; iter.Next(&thisData); {
-					func(thisData map[string]interface{}) {
-						userid := gconv.String(thisData["userid"])
-						if userid == "" {
-							return
-						}
-						if !mongodb.IsObjectIdHex(userid) {
-							userid, _ = GetUserIdByPositionId(userid)
-						}
-						if userid == "" {
-							return
-						}
-						url_ := gconv.String(thisData["url"])
-						reg := regexp.MustCompile(".*article/content/(.*)\\.html")
-						contentnum := 0
-						if reg.MatchString(url_) {
-							contentnum = 1
-						}
-
-						createtime := gconv.Int64(thisData["date"])
-
-						craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
-						platform := APP
-						if table == "jy_logs" {
-							platform = PC
-							if client := gconv.String(thisData["client"]); client != "" {
-								if RegWx.MatchString(client) {
-									platform = WX
-								}
-							}
-						}
-						userVisit[userid] = updateUser(userVisit[userid], 1, contentnum, platform, userid, craetetimeStr, craetetimeStr)
-
-						//
-					}(thisData)
-					count++
-					if count%5000 == 0 {
-						log.Printf("%s已完成%d条数据\n", table, count)
-					}
-					thisData = map[string]interface{}{}
-				}
-				log.Println("end!")
-				// sWait.Wait()
-			}(table)
-		}
-		fieids := []string{"userid", "DATE", "number", "platform", "createtime", "contentnum"}
-		args := []interface{}{}
-		ii := 0
-		if len(userVisit) > 0 {
-			log.Println("用户量:", len(userVisit))
-			for _, v := range userVisit {
-				userid := (*v).userid
-				contentnum := (*v).contentnum
-				number := (*v).number
-				createtime := (*v).createtime
-				datetime := (*v).datetime
-				platform := (*v).platform
-				args = append(args, userid, datetime, number, platform, createtime, contentnum)
-				ii++
-				if ii%index == 0 {
-					if len(args) > 0 {
-						config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
-						args = []interface{}{}
-					}
-				}
-			}
-			if len(args) > 0 {
-				config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
-			}
-		}
-
-	}
-}
-
-type visitStruct struct {
-	userid     string
-	createtime string //创建时间
-	datetime   string //访问时间
-	platform   int
-	number     int
-	contentnum int
-}
-
-func updateUser(visit *visitStruct, num, contmun, platform int, userid, createtime, datetime string) *visitStruct {
-	if visit == nil {
-		return &visitStruct{
-			userid:     userid,
-			createtime: createtime,
-			datetime:   datetime,
-			number:     num,
-			contentnum: contmun,
-			platform:   platform,
-		}
-	} else {
-		if num > 0 {
-			visit.number = visit.number + num
-		}
-		if contmun > 0 {
-			visit.contentnum = visit.contentnum + contmun
-		}
-		visit.platform = platform
-		visit.datetime = datetime
-	}
-	return visit
-}

+ 15 - 12
telemarketingEtl/main.go

@@ -6,20 +6,23 @@ import (
 )
 
 func main() {
-	t1 := timetask.TaskCountMaxVisit{}
-	t1.Run()
-	t2 := timetask.TaskCountCalled{}
-	t2.Run()
-	t3 := timetask.TaskMissedCalls{}
-	t3.Run()
-	t4 := timetask.TaskJyList{}
-	t4.Run()
-	t5 := timetask.TaskJyDetail{}
+
+	//t1 := &timetask.TaskCountMaxVisit{}
+	//t1.Run()
+	//t2 := &timetask.TaskCountCalled{}
+	//t2.Run()
+	//t3 := &timetask.TaskMissedCalls{}
+	//t3.Run()
+	//t4 := &timetask.TaskVipExpire{}
+	//t4.Run()
+	t5 := &timetask.TaskJyDetail{}
 	t5.Run()
-	t6 := timetask.TaskVipExpire{}
-	t6.Run()
+
+	//t := &timetask.TaskVisitInfo{}
+	//t.Run()
+
 	//定时任务开始
-	//timetask.Run()
+	timetask.Run()
 	select {}
 
 }

+ 2 - 0
telemarketingEtl/timetask/task.go

@@ -64,6 +64,7 @@ func (t *TaskSearchInfo) Run() {
 	start := now.Add(-time.Minute * time.Duration(min)).Unix()
 	//搜索事件
 	entity.SearchInfoAdd(start, end)
+	log.Println("结束TaskSearchInfo")
 }
 
 type TaskVisitInfo struct{}
@@ -75,6 +76,7 @@ func (t *TaskVisitInfo) Run() {
 	min := gcfg.Instance().MustGet(gctx.New(), "visitInfoTime", "").Int()
 	start := now.Add(-time.Minute * time.Duration(min)).Unix()
 	entity.VisitInfoAdd(start, end)
+	//entity.VisitInfoAdd(1721404800, 1724947200)
 	log.Println("结束TaskVisitInfo")
 }