Bläddra i källkod

wip:公海队列

zhangxinlei1996 2 år sedan
förälder
incheckning
11fb231d39

+ 19 - 1
telemarketingEtl/config.yaml

@@ -55,4 +55,22 @@ regex:
   - "^/swordfish/frontPage/collection/sess/index:pcCollection"#pc-标讯收藏
   - "^/jyapp/frontPage/collection/sess/index:appCollection"#app-标讯收藏
   - "^/weixin/frontPage/collection/sess/index:wxCollection"#wx-标讯收藏
-  - "^/product/index?serviceType=0:productIndex"#pc-会员介绍页面
+  - "^/product/index?serviceType=0:productIndex"#pc-会员介绍页面
+classOneHighSeas_A: 3  #一级公海A 3天内点击过剑鱼付费产品介绍页...
+classOneHighSeas_B: 30 #一级公海B 购买了超级订阅产品剩余使用时长不足30天的客户;
+classTwoHighSeaslastDay: 30 #二级公海 最近30天内活跃天数
+classTwoHighSeas_A: 15# #二级公海 A最近30天内活跃天数≥15天的客户
+classTwoHighSeas_B: #二级公海 B 10天≤最近30天内活跃天数<15天的客户;
+  - 10
+  - 15
+classTwoHighSeas_C: #二级公海C 5天≤最近30天内活跃天数<10天的客户;
+  - 5
+  - 10
+classTwoHighSeas_D: #二级公海D 1天≤最近30天内活跃天数<5天的客户;
+  - 1
+  - 5
+classThreeHighSeasDay: 0
+classThreeHighSeaslastDay: 30
+recycleBin_A: 5  #回收站:A.5个自然日内被销售人员手动退回公海的客户;
+recycleBin_B: 3  #回收站:B.3个自然日内有过“已接听”的通话记录且仍处于“商机线索”状态下的客户;
+

+ 376 - 0
telemarketingEtl/entity/dwd_f_crm_open_sea.go

@@ -0,0 +1,376 @@
+package entity
+
+import (
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"log"
+	"sync"
+	"telemarketingEtl/config"
+	"telemarketingEtl/util"
+	"time"
+)
+
+/*
+超级订阅落地页 /front/vipsubscribe/introducePage  /jyapp/vipsubscribe/introducePage
+大会员落地页 /big/wx/page/landingPage /jyapp/big/page/landingPage
+产品体系服务 /product/index
+数据导出筛选 /front/dataExport/toSieve  /front/wx_dataExport/toSieve /jyapp/front/dataExport/toSieve
+数据导出购买 /front/dataExport/toCreateOrderPage/425b420f564502000756420a4056564451535451430e490f?from=&type=2&source=
+数据流量包 /front/dataPack/createOrder
+超级订阅 /swordfish/page_big_pc/free/svip/buy
+大会员 /big/pc/page/buy_commit
+*/
+
+/*
+3.公海客户来源:
+(1)一级公海:
+A. 3天内点击过剑鱼付费产品介绍页(产品-供应商服务体系页、大会员落地页、超级订阅落地页、数据自助导出-筛选条件页)或进入了购买页(超级订阅、大会员购买页、数据自助导出购买页、数据流量包购买页)的客户;
+B. 购买了超级订阅产品剩余使用时长不足30天的客户;
+C.超时未跟进导致被自动退回至公海的处于“潜在客户”、“意向客户”、“高意向客户”
+(2)二级公海:
+A.最近30天内活跃天数≥15天的客户;
+B.10天≤最近30天内活跃天数<15天的客户;
+C.5天≤最近30天内活跃天数<10天的客户;
+D.1天≤最近30天内活跃天数<5天的客户;
+备注:判断当日是否活跃的标准为当日是否点击查看了标讯内容。
+(3)三级公海:
+最近30天内活跃天数=0天的客户。
+(4)回收站:
+A.5个自然日内被销售人员手动退回公海的客户;
+B.3个自然日内有过“已接听”的通话记录且仍处于“商机线索”状态下的客户。
+(5)以上,涉及时间及量的均可配置。
+
+*/
+
+var (
+	oPool chan bool
+	oWait = &sync.WaitGroup{}
+)
+
+func init() {
+	poolSize := g.Cfg().MustGet(ctx, "poolSize").Int()
+	oPool = make(chan bool, poolSize)
+}
+
+func GetOpenSea() {
+	//
+	session := config.MgoLog.GetMgoConn()
+	defer config.MgoLog.DestoryMongoConn(session)
+	t := time.Now()
+	t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local)
+	end := t.Unix()
+	tA := t.AddDate(0, 0, -g.Cfg().MustGet(ctx, "classOneHighSeas_A").Int())
+
+	start := tA.Unix()
+	oneClassA := map[string]bool{}
+	//一级公海查询条件
+	queryOneClassPc := map[string]interface{}{
+		"date": map[string]interface{}{
+			"$gte": start,
+			"$lt":  end,
+		},
+		"$or": []map[string]interface{}{
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/front/vipsubscribe/introducePage`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/big/wx/page/landingPage`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/product/index`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/front/dataExport/toSieve`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/front/dataExport/toCreateOrderPage`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/front/dataPack/createOrder`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/swordfish/page_big_pc/free/svip/buy`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/big/pc/page/buy_commit`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/front/wx_dataExport/toSieve`,
+				},
+			},
+		},
+	}
+	iter := session.DB("qfw").C("jy_logs").Find(queryOneClassPc).Iter()
+	count := 0
+	//一级公海
+	for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+		oPool <- true
+		oWait.Add(1)
+		go func(thisData map[string]interface{}) {
+			defer func() {
+				<-oPool
+				oWait.Done()
+			}()
+			userid := gconv.String(thisData["userid"])
+			if userid == "" {
+				return
+			}
+			if !mongodb.IsObjectIdHex(userid) {
+				userid = GetUserIdByPositionId(userid)
+			}
+			if userid == "" {
+				return
+			}
+			//根据userid获取线索id
+			if oneClassA[userid] {
+				return
+			}
+			uuid := GetClueIdByUserId(userid)
+			if uuid == "" {
+				return
+			}
+			oneClassA[uuid] = true
+		}(thisData)
+		count++
+		if count%100000 == 0 {
+			log.Printf("已完成%d条数据\n", count)
+		}
+		thisData = map[string]interface{}{}
+	}
+	oWait.Wait()
+
+	queryOneClassApp := map[string]interface{}{
+		"date": map[string]interface{}{
+			"$gte": start,
+			"$lt":  end,
+		},
+		"$or": []map[string]interface{}{
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/jyapp/vipsubscribe/introducePage`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `jyapp/big/page/landingPage`,
+				},
+			},
+			map[string]interface{}{
+				"url": map[string]interface{}{
+					"$regex": `/jyapp/front/dataExport/toSieve`,
+				},
+			},
+		},
+	}
+	iter2 := session.DB("qfw").C("jyapp_logs").Find(queryOneClassApp).Iter()
+	count = 0
+	//一级公海
+	for thisData := map[string]interface{}{}; iter2.Next(&thisData); {
+		oPool <- true
+		oWait.Add(1)
+		go func(thisData map[string]interface{}) {
+			defer func() {
+				<-oPool
+				oWait.Done()
+			}()
+			userid := gconv.String(thisData["userid"])
+			if userid == "" {
+				return
+			}
+			if !mongodb.IsObjectIdHex(userid) {
+				userid = GetUserIdByPositionId(userid)
+			}
+			if userid == "" {
+				return
+			}
+			//根据userid获取线索id
+			if oneClassA[userid] {
+				return
+			}
+			uuid := GetClueIdByUserId(userid)
+			if uuid == "" {
+				return
+			}
+			oneClassA[uuid] = true
+		}(thisData)
+		count++
+		if count%100000 == 0 {
+			log.Printf("已完成%d条数据\n", count)
+		}
+		thisData = map[string]interface{}{}
+	}
+	oWait.Wait()
+	//更新公海
+	AddOpenSea(oneClassA, 1, "A")
+	//
+	oneClassB := map[string]bool{}
+	classB, ok := config.Mgo.Find("user", map[string]interface{}{
+		"l_vip_endtime": map[string]interface{}{
+			"$gte": t.AddDate(0, 0, -g.Cfg().MustGet(ctx, "classOneHighSeas_B").Int()),
+		},
+		"i_vip_status": map[string]interface{}{
+			"$gt": 0,
+		},
+		"i_appid": 2,
+	}, nil, `{"_id":1}`, false, -1, -1)
+	if classB != nil && ok && len(*classB) > 0 {
+		for _, v := range *classB {
+			userid := mongodb.BsonIdToSId(v["_id"])
+			if oneClassA[userid] {
+				continue
+			}
+			uuid := GetClueIdByUserId(userid)
+			oneClassB[uuid] = true
+		}
+	}
+	AddOpenSea(oneClassB, 1, "B")
+	//TODO 一级公海c
+	oneClassC := map[string]bool{}
+	twoA, twoB, twoC, twoD := TwoOpenSea(oneClassA, oneClassB, oneClassC)
+	AddOpenSea(twoA, 2, "A")
+	AddOpenSea(twoB, 2, "B")
+	AddOpenSea(twoC, 2, "C")
+	AddOpenSea(twoD, 2, "D")
+	//三级公海
+	ThreeOpenSea()
+	//
+	log.Println("end")
+}
+
+//二级公海
+/*
+	A.最近30天内活跃天数≥15天的客户;
+	B.10天≤最近30天内活跃天数<15天的客户;
+	C.5天≤最近30天内活跃天数<10天的客户;
+	D.1天≤最近30天内活跃天数<5天的客户;
+*/
+func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cMap, dMap map[string]bool) {
+	q := "SELECT userid,COUNT(1) as count FROM dwd_f_userbase_visit_info WHERE  createtime > %s AND contentnum >%v GROUP BY userid "
+	//
+	stmtOut, err := config.JianyuSubjectdb.DB.Prepare(q)
+	defer func() {
+		log.Println("stmtOut.Close start")
+		stmtOut.Close()
+		log.Println("stmtOut.Close over")
+	}()
+
+	rows, err := stmtOut.Query()
+	if err != nil {
+		log.Println(err)
+	}
+	defer func() {
+		log.Println("rows.Close start")
+		rows.Close()
+		log.Println("rows.Close over")
+	}()
+	aMap = map[string]bool{}
+	bMap = map[string]bool{}
+	cMap = map[string]bool{}
+	dMap = map[string]bool{}
+	for rows.Next() {
+		var userid string
+		var ct int
+		err := rows.Scan(&userid, &ct)
+		if err != nil {
+			log.Println("row scan err:", err)
+		}
+		uuid := GetClueIdByUserId(userid)
+		if oneClassA[uuid] || oneClassB[uuid] || oneClassC[uuid] {
+			continue
+		}
+		if ct >= 15 {
+			aMap[uuid] = true
+		}
+		if ct < 15 && ct >= 10 {
+			bMap[uuid] = true
+		}
+		if ct < 10 && ct >= 5 {
+			cMap[uuid] = true
+		}
+		if ct < 5 && ct >= 1 {
+			dMap[uuid] = true
+		}
+	}
+	// Check for errors during iteration
+	err = rows.Err()
+	if err != nil {
+		log.Println("rows err err:", err)
+	}
+	return
+}
+
+// 三级公海:
+// 最近30天内活跃天数=0天的客户。
+func ThreeOpenSea() {
+	t := time.Now()
+	t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local)
+	day := g.Cfg().MustGet(ctx, "classThreeHighSeaslastDay").Int()
+	t.AddDate(0, 0, day)
+	start := util.GetObjectId(t.Unix())
+	createtime := time.Now().Format(date.Date_Full_Layout)
+
+	config.JianyuSubjectdb.SelectByBath(500, func(l *[]map[string]interface{}) bool {
+		for _, v := range *l {
+			userid := gconv.String(v["userid"])
+			query := map[string]interface{}{
+				"userid": userid,
+				"_id": map[string]interface{}{
+					"$gte": start,
+				},
+			}
+			uuid := gconv.String(v["uid"])
+			if r, ok := config.MgoLog.Find("subscribepay_logs", query, nil, `{"_id":1}`, false, 0, 1); ok && r != nil && len(*r) == 0 {
+				config.JianyuSubjectdb.ExecBySql(`INSERT INTO dwd_f_crm_open_seax (comeintime,comeinsource,LEVEL,clue_level,clue_id) 
+												VALUES(?,?,?,?,?)
+												ON DUPLICATE KEY UPDATE comeintime=?,comeinsource=?,LEVEL=?,clue_level=?`, createtime, 1, 3, "D", uuid, createtime, 1, 3, "D")
+
+			}
+		}
+	}, `select userid,uid from dwd_f_crm_clue_info where is_assign !=1`)
+
+}
+
+// 根据mongodb userid 获取 线索id
+func GetClueIdByUserId(userid string) (uuid string) {
+	if userid == "" {
+		return
+	}
+	data := config.JianyuSubjectdb.SelectBySql(`select id from dwd_f_crm_clue_info where userid=? limit 1`, userid)
+	if data == nil || len(*data) == 0 {
+		return
+	}
+	return gconv.String((*data)[0]["id"])
+}
+
+// m 需要更新的Map key:uuid
+// level 公海级别
+// clue_level线索级别
+func AddOpenSea(m map[string]bool, level int, clue_level string) {
+	createtime := time.Now().Format(date.Date_Full_Layout)
+	if len(m) > 0 {
+		for _, v := range m {
+			config.JianyuSubjectdb.ExecBySql(`INSERT INTO dwd_f_crm_open_seax (comeintime,comeinsource,LEVEL,clue_level,clue_id) 
+												VALUES(?,?,?,?,?)
+												ON DUPLICATE KEY UPDATE comeintime=?,comeinsource=?,LEVEL=?,clue_level=?`, createtime, 1, level, clue_level, v, createtime, 1, level, clue_level)
+		}
+	}
+}

+ 7 - 0
telemarketingEtl/main.go

@@ -1,12 +1,19 @@
 package main
 
 import (
+	"log"
+	"telemarketingEtl/config"
 	_ "telemarketingEtl/config"
 	"telemarketingEtl/timetask"
 	"time"
 )
 
 func main() {
+	r, ok := config.MgoLog.Find("jy_logs", map[string]interface{}{
+		"userid": "333",
+	}, nil, `{"_id":1}`, false, 0, 1)
+	log.Println(r, ok, len(*r), r != nil)
+	return
 
 	timetask.TimeTask()
 

+ 4 - 0
telemarketingEtl/timetask/task.go

@@ -49,3 +49,7 @@ func TaskVisitInfo() {
 	//start := now.Add(-time.Hour * 24).Unix()
 	entity.VisitInfoAdd(1677657397, end)
 }
+
+func TaskOpenSea() {
+	entity.GetOpenSea()
+}