Explorar o código

Merge branch 'dev1.1' of http://192.168.3.207:8080/dataservice/datatools into dev1.1

xuzhiheng %!s(int64=2) %!d(string=hai) anos
pai
achega
03c82853bd

+ 3 - 0
telemarketingEtl/config.yaml

@@ -73,6 +73,7 @@ classThreeHighSeasDay: 0
 classThreeHighSeaslastDay: 30
 recycleBin_A: 5  #回收站:A.5个自然日内被销售人员手动退回公海的客户;
 recycleBin_B: 3  #回收站:B.3个自然日内有过“已接听”的通话记录且仍处于“商机线索”状态下的客户;
+timetaskBl: true
 #定时任务 查看事件
 eventInfoTask: '0 0 * * * *'
 #定时任务 搜索事件
@@ -85,3 +86,5 @@ TaskOpenSea: '0 0 0 * * *'
 deleteOpenSeaTask: '0 0 0 * * *'
 #自动退回公海
 returnOpenSeaTask: '0 0 0 * * *'
+#
+openSeaTask: '0 0 0 * * *'

+ 56 - 18
telemarketingEtl/entity/dwd_f_crm_open_sea.go

@@ -1,16 +1,19 @@
 package entity
 
 import (
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/util/gconv"
+	"fmt"
 	"log"
 	"strings"
 	"sync"
 	"telemarketingEtl/config"
 	"telemarketingEtl/util"
 	"time"
+
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gctx"
+	"github.com/gogf/gf/v2/util/gconv"
 )
 
 /*
@@ -51,11 +54,13 @@ var (
 )
 
 func init() {
+	ctx := gctx.New()
 	poolSize := g.Cfg().MustGet(ctx, "poolSize").Int()
 	oPool = make(chan bool, poolSize)
 }
 
 func GetOpenSea() {
+	ctx := gctx.New()
 	//
 	session := config.MgoLog.GetMgoConn()
 	defer config.MgoLog.DestoryMongoConn(session)
@@ -120,8 +125,10 @@ func GetOpenSea() {
 			},
 		},
 	}
+	log.Println("queryOneClassPc:", queryOneClassPc)
 	iter := session.DB("qfw").C("jy_logs").Find(queryOneClassPc).Iter()
 	count := 0
+	log.Println("一级公海开始.")
 	//一级公海
 	for thisData := map[string]interface{}{}; iter.Next(&thisData); {
 		oPool <- true
@@ -152,13 +159,13 @@ func GetOpenSea() {
 			oneClassA[uuid] = true
 		}(thisData)
 		count++
-		if count%100000 == 0 {
+		if count%10 == 0 {
 			log.Printf("已完成%d条数据\n", count)
 		}
 		thisData = map[string]interface{}{}
 	}
 	oWait.Wait()
-
+	log.Println("一级公海 pc  结束")
 	queryOneClassApp := map[string]interface{}{
 		"date": map[string]interface{}{
 			"$gte": start,
@@ -214,7 +221,7 @@ func GetOpenSea() {
 			oneClassA[uuid] = true
 		}(thisData)
 		count++
-		if count%100000 == 0 {
+		if count%10 == 0 {
 			log.Printf("已完成%d条数据\n", count)
 		}
 		thisData = map[string]interface{}{}
@@ -222,6 +229,7 @@ func GetOpenSea() {
 	oWait.Wait()
 	//更新公海
 	AddOpenSea(oneClassA, 1, "A")
+	log.Println("一级公海 更新结束。")
 	//
 	oneClassB := map[string]bool{}
 	classB, ok := config.Mgo.Find("user", map[string]interface{}{
@@ -244,8 +252,9 @@ func GetOpenSea() {
 		}
 	}
 	AddOpenSea(oneClassB, 1, "B")
-	//TODO 一级公海c
-	oneClassC := map[string]bool{}
+	oneClassC := GetOneSeaC()
+	AddOpenSea(oneClassC, 1, "C")
+	//
 	twoA, twoB, twoC, twoD := TwoOpenSea(oneClassA, oneClassB, oneClassC)
 	AddOpenSea(twoA, 2, "A")
 	AddOpenSea(twoB, 2, "B")
@@ -265,9 +274,13 @@ func GetOpenSea() {
 	D.1天≤最近30天内活跃天数<5天的客户;
 */
 func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cMap, dMap map[string]bool) {
-	q := "SELECT userid,COUNT(1) as count FROM dwd_f_userbase_visit_info WHERE  createtime > %s AND contentnum >%v GROUP BY userid "
+	i_day := g.Cfg().MustGet(ctx, "classTwoHighSeaslastDay").Int()
+	day := time.Now().AddDate(0, 0, -i_day).Format(date.Date_Full_Layout)
+	q := fmt.Sprintf("SELECT userid,COUNT(1) as count FROM dwd_f_userbase_visit_info WHERE  createtime > '%s' AND contentnum >0 GROUP BY userid ", day)
 	//
+	log.Println(q)
 	stmtOut, err := config.JianyuSubjectdb.DB.Prepare(q)
+	log.Println("stmtOut.err:", err)
 	defer func() {
 		log.Println("stmtOut.Close start")
 		stmtOut.Close()
@@ -276,7 +289,7 @@ func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cM
 
 	rows, err := stmtOut.Query()
 	if err != nil {
-		log.Println(err)
+		log.Println("er:", err)
 	}
 	defer func() {
 		log.Println("rows.Close start")
@@ -322,6 +335,7 @@ func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cM
 // 三级公海:
 // 最近30天内活跃天数=0天的客户。
 func ThreeOpenSea() {
+	ctx := gctx.New()
 	t := time.Now()
 	t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local)
 	day := g.Cfg().MustGet(ctx, "classThreeHighSeaslastDay").Int()
@@ -338,16 +352,17 @@ func ThreeOpenSea() {
 					"$gte": start,
 				},
 			}
-			uuid := gconv.String(v["uid"])
+			// uuid := gconv.String(v["uid"])
+			id := gconv.Int64(v["id"])
 			if r, ok := config.MgoLog.Find("subscribepay_logs", query, nil, `{"_id":1}`, false, 0, 1); ok && r != nil && len(*r) == 0 {
-				config.JianyuSubjectdb.ExecBySql(`INSERT INTO dwd_f_crm_open_seax (comeintime,comeinsource,LEVEL,clue_level,clue_id) 
+				config.JianyuSubjectdb.ExecBySql(`INSERT INTO dwd_f_crm_open_sea (comeintime,comeinsource,LEVEL,clue_level,clue_id) 
 												VALUES(?,?,?,?,?)
-												ON DUPLICATE KEY UPDATE comeintime=?,comeinsource=?,LEVEL=?,clue_level=?`, createtime, 1, 3, "D", uuid, createtime, 1, 3, "D")
+												ON DUPLICATE KEY UPDATE comeintime=?,comeinsource=?,LEVEL=?,clue_level=?`, createtime, 1, 3, "D", id, createtime, 1, 3, "D")
 
 			}
 		}
 		return true
-	}, `select userid,uid from dwd_f_crm_clue_info where is_assign !=1`)
+	}, `select userid,uid,id from dwd_f_crm_clue_info where is_assign !=1`)
 }
 
 // 根据mongodb userid 获取 线索id
@@ -369,7 +384,7 @@ func AddOpenSea(m map[string]bool, level int, clue_level string) {
 	createtime := time.Now().Format(date.Date_Full_Layout)
 	if len(m) > 0 {
 		for _, v := range m {
-			config.JianyuSubjectdb.ExecBySql(`INSERT INTO dwd_f_crm_open_seax (comeintime,comeinsource,LEVEL,clue_level,clue_id) 
+			config.JianyuSubjectdb.ExecBySql(`INSERT INTO dwd_f_crm_open_sea (comeintime,comeinsource,LEVEL,clue_level,clue_id) 
 												VALUES(?,?,?,?,?)
 												ON DUPLICATE KEY UPDATE comeintime=?,comeinsource=?,LEVEL=?,clue_level=?`, createtime, 1, level, clue_level, v, createtime, 1, level, clue_level)
 		}
@@ -395,7 +410,7 @@ func DeleteOpenSea() {
 			return true
 		}
 		return true
-	}, `select id from dwd_f_crm_clue_info where trailstatus == ?`, "00")
+	}, `select id from dwd_f_crm_clue_info where trailstatus = ?`, "00")
 }
 
 //自动退回公海
@@ -409,6 +424,7 @@ func DeleteOpenSea() {
 7.“无意向客户”自动退回公海;
 */
 func ReturnOpenSea() {
+	ctx := gctx.New()
 	t := time.Now()
 	t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local)
 	//高意向客户
@@ -433,13 +449,15 @@ func ReturnOpenSea() {
 		"08": t.AddDate(0, 0, -noIdeaCustomer),
 	} {
 		config.JianyuSubjectdb.SelectByBath(500, func(l *[]map[string]interface{}) bool {
+			//1新增 2私海手动退回 3私海高意向客户自动退回 4私海意向客户退回 5私海潜在客户退回 6私海沉睡客户退回 7私海商机线索退回 8私海无意向客户退回
+			comeinsource := GetComeSource()[trailstatus]
 			ids := []interface{}{}
 			args := []interface{}{}
 			for _, v := range *l {
 				id := v["id"]
 				ids = append(ids, id)
 				//
-				args = append(args, id, time.Now().Format(date.Date_Full_Layout), 2)
+				args = append(args, id, time.Now().Format(date.Date_Full_Layout), comeinsource)
 			}
 			whs := []string{}
 			for i := 0; i < len(ids); i++ {
@@ -460,4 +478,24 @@ func ReturnOpenSea() {
 				LEFT JOIN  dwd_f_crm_trail_content c ON b.id =c.clue_id 
 				WHERE b.trailstatus =? AND c.next_time >?  GROUP BY  b.id`, trailstatus, nexttime)
 	}
+	log.Println("return sea end")
+}
+
+func GetComeSource() map[string]int {
+	return map[string]int{
+		"04": 2,
+		"05": 4,
+		"06": 3,
+	}
+}
+
+func GetOneSeaC() map[string]bool {
+	m := map[string]bool{}
+	config.JianyuSubjectdb.SelectByBath(500, func(l *[]map[string]interface{}) bool {
+		for _, v := range *l {
+			m[gconv.String(v["clue_id"])] = true
+		}
+		return true
+	}, `select clue_id from dwd_f_crm_open_sea where comeinsource in(2,3,4)`)
+	return m
 }

+ 15 - 25
telemarketingEtl/entity/dwd_f_userbase_event_info.go

@@ -1,15 +1,7 @@
 package entity
 
-import "C"
 import (
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/encrypt"
-	"app.yhyue.com/moapp/jybase/mongodb"
 	"context"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gtime"
-	"github.com/gogf/gf/v2/util/gconv"
-	"go.mongodb.org/mongo-driver/mongo/options"
 	"log"
 	"net/url"
 	"regexp"
@@ -18,6 +10,14 @@ import (
 	"telemarketingEtl/config"
 	"telemarketingEtl/util"
 	"time"
+
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/encrypt"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gtime"
+	"github.com/gogf/gf/v2/util/gconv"
+	"go.mongodb.org/mongo-driver/mongo/options"
 )
 
 type EventInfo struct {
@@ -44,6 +44,7 @@ var (
 	AppBuyPortraitNameReg = regexp.MustCompile(".*jyapp/big/page/unit_portrayal\\?entName=(.*)")
 	PcEntPortraitNameReg  = regexp.MustCompile(".*/swordfish/page_big_pc/ent_portrait/(.*)")
 	AppEntPortraitNameReg = regexp.MustCompile(".*/jyapp/big/page/ent_portrait\\?eId=(.*)")
+	CollectionReg         = regexp.MustCompile(".*frontPage/collection/sess/index")
 )
 
 func init() {
@@ -108,6 +109,10 @@ func filter(url string) bool {
 	return false
 }
 
+func In() {
+
+}
+
 // 查看事件
 func EventInfoAdd(start, end int64) {
 	//
@@ -208,6 +213,8 @@ func EventInfoAdd(start, end int64) {
 							entId := util.DecodeId(eId)
 							name = GetEntNameByEntId(entId)
 						}
+					} else if CollectionReg.MatchString(url_) {
+						eventtype = COLLECTION
 					}
 					createtime := time.Unix(gconv.Int64(v["date"]), 0).Format(date.Date_Full_Layout)
 					values = append(values, userid, eventtype, name, url_, platform, createtime)
@@ -247,23 +254,6 @@ func EventInfoAdd(start, end int64) {
 	}
 }
 
-// 根据职位id获取mongodb userid
-func GetUserIdByPositionId(positionId string) (userId string) {
-	if positionId == "" || positionId == "0" {
-		return
-	}
-	i_positionId := gconv.Int(positionId)
-	if i_positionId == 0 {
-		return
-	}
-	data := config.JianyuSubjectdb.SelectBySql(`select userid from dwd_f_userbase_id_mapping where position_id =? limit 1`, i_positionId)
-	if data == nil || len(*data) <= 0 {
-		return
-	}
-	userId = gconv.String((*data)[0]["userid"])
-	return
-}
-
 // 根据企业id获取企业名称
 func GetEntNameByEntId(entId string) (name string) {
 	if entId == "" {

+ 5 - 5
telemarketingEtl/entity/dwd_f_userbase_search_info.go

@@ -1,11 +1,6 @@
 package entity
 
-import "C"
 import (
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/util/gconv"
 	"log"
 	"strconv"
 	"strings"
@@ -13,6 +8,11 @@ import (
 	"telemarketingEtl/config"
 	"telemarketingEtl/util"
 	"time"
+
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
 )
 
 var (

+ 11 - 10
telemarketingEtl/entity/dwd_f_userbase_visit_info.go

@@ -1,18 +1,18 @@
 package entity
 
-import "C"
 import (
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/mongodb"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gtime"
-	"github.com/gogf/gf/v2/util/gconv"
 	"log"
 	"regexp"
 	"sync"
 	"telemarketingEtl/config"
 	"telemarketingEtl/util"
 	"time"
+
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gtime"
+	"github.com/gogf/gf/v2/util/gconv"
 )
 
 var USERPOOL = 20
@@ -44,7 +44,7 @@ func VisitInfoAdd(start, end int64) {
 	}
 	RegWx, _ := regexp.Compile("MicroMessenger")
 
-	log.Println("task run", start, end, gtime.NewFromTimeStamp(start))
+	log.Println("task run", start, end, gtime.NewFromTimeStamp(start), query)
 	//regWx, _ := regexp.Compile("MicroMessenger")
 	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
 	log.Println("analy tables:", tables)
@@ -71,8 +71,8 @@ func VisitInfoAdd(start, end int64) {
 					if userid == "" {
 						return
 					}
-					url_ := gconv.String(thisData["url_"])
-					reg := regexp.MustCompile("/article/(content|bdprivate|mailprivate)") //todo 正则
+					url_ := gconv.String(thisData["url"])
+					reg := regexp.MustCompile(".*article/content/(.*)\\.html")
 					contentnum := 0
 					if reg.MatchString(url_) {
 						contentnum = 1
@@ -95,7 +95,7 @@ func VisitInfoAdd(start, end int64) {
 						if contentnum == 0 {
 							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,date =? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, userid, starttime, endtime)
 						} else {
-							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,contentnum = contentnum + 1 where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, userid, starttime, endtime)
+							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,contentnum = contentnum + 1,date =? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, userid, starttime, endtime)
 						}
 					} else {
 						config.JianyuSubjectdb.InsertBySql(`INSERT INTO dwd_f_userbase_visit_info
@@ -110,6 +110,7 @@ func VisitInfoAdd(start, end int64) {
 				}
 				thisData = map[string]interface{}{}
 			}
+			log.Println("end!")
 			sWait.Wait()
 		}(table)
 	}

+ 25 - 0
telemarketingEtl/entity/entity.go

@@ -1,5 +1,11 @@
 package entity
 
+import (
+	"telemarketingEtl/config"
+
+	"github.com/gogf/gf/v2/util/gconv"
+)
+
 // 平台
 const (
 	APP = iota
@@ -12,5 +18,24 @@ const (
 	BUYPORTRAIT = "采购单位画像"
 	INFO        = "标讯三级页"
 	ENTPORTRAIT = "企业画像"
+	LOGIN       = "登录"
+	COLLECTION  = "标讯收藏"
 	//TODO
 )
+
+// 根据职位id获取mongodb userid
+func GetUserIdByPositionId(positionId string) (userId string) {
+	if positionId == "" || positionId == "0" {
+		return
+	}
+	i_positionId := gconv.Int(positionId)
+	if i_positionId == 0 {
+		return
+	}
+	data := config.JianyuSubjectdb.SelectBySql(`select userid from dwd_f_userbase_id_mapping where position_id =? limit 1`, i_positionId)
+	if data == nil || len(*data) <= 0 {
+		return
+	}
+	userId = gconv.String((*data)[0]["userid"])
+	return
+}

+ 21 - 25
telemarketingEtl/timetask/task.go

@@ -2,58 +2,51 @@ package timetask
 
 import (
 	"context"
+	"log"
+	"telemarketingEtl/entity"
+	"time"
+
 	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
 	"github.com/gogf/gf/v2/os/gcfg"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gctx"
-	"telemarketingEtl/entity"
-	"time"
 )
 
-//func TimeTask() {
-//	//TaskEventInfo()
-//	//TaskSearchInfo()
-//	TaskVisitInfo()
-//	return
-//	var (
-//		err error
-//		ctx = gctx.New()
-//	)
-//	_, err = gcron.Add(ctx, "1 0 0 * * *", func(ctx context.Context) {
-//		g.Log().Print(ctx, "Every day")
-//		//Task_clearUserLog()
-//	}, "clearUserTask")
-//	if err != nil {
-//		panic(err)
-//	}
-//
-//}
-
 func TaskEventInfo() {
+	log.Println("开始TaskEventInfo")
 	now := time.Now()
 	end := now.Unix()
-	start := now.Add(-time.Hour * 24).Unix()
+	// start := now.Add(-time.Hour * 1).Unix()
+	start := now.Add(-time.Minute * 5).Unix()
 	//查看事件
 	entity.EventInfoAdd(start, end)
+	log.Println("结束TaskEventInfo")
 }
 
 func TaskSearchInfo() {
+	log.Println("开始TaskSearchInfo")
 	now := time.Now()
 	end := now.Unix()
-	start := now.Add(-time.Hour * 24).Unix()
+	// start := now.Add(-time.Hour * 1).Unix()
+	start := now.Add(-time.Minute * 5).Unix()
 	//搜索事件
 	entity.SearchInfoAdd(start, end)
 }
 
 func TaskVisitInfo() {
+	log.Println("开始TaskVisitInfo")
 	now := time.Now()
 	end := now.Unix()
-	start := now.Add(-time.Hour * 24).Unix()
+	// start := now.Add(-time.Hour * 1).Unix()
+	start := now.Add(-time.Minute * 5).Unix()
 	entity.VisitInfoAdd(start, end)
+	log.Println("结束TaskVisitInfo")
 }
 
 func TaskOpenSea() {
+	log.Println("开始TaskOpenSea")
 	entity.GetOpenSea()
+	log.Println("结束TaskOpenSea")
 }
 
 // 定时任务
@@ -64,7 +57,6 @@ func Run() {
 			err error
 			ctx = gctx.New()
 		)
-
 		eventInfoTask := gcfg.Instance().MustGet(gctx.New(), "eventInfoTask", "").String()
 		searchInfoTask := gcfg.Instance().MustGet(gctx.New(), "searchInfoTask", "").String()
 		visitInfoTask := gcfg.Instance().MustGet(gctx.New(), "visitInfoTask", "").String()
@@ -102,14 +94,18 @@ func Run() {
 		}
 		//
 		_, err = gcron.Add(ctx, deleteOpenSeaTask, func(ctx context.Context) {
+			log.Println("开始DeleteOpenSea")
 			entity.DeleteOpenSea()
+			log.Println("结束DeleteOpenSea")
 		}, "DeleteOpenSea")
 		if err != nil {
 			panic(err)
 		}
 		//
 		_, err = gcron.Add(ctx, returnOpenSeaTask, func(ctx context.Context) {
+			log.Println("开始ReturnOpenSea")
 			entity.ReturnOpenSea()
+			log.Println("结束ReturnOpenSea")
 		}, "ReturnOpenSea")
 		if err != nil {
 			panic(err)