zhangxinlei1996 2 жил өмнө
parent
commit
a40ca51bad

+ 61 - 5
telemarketingEtl/entity/dwd_f_crm_open_sea.go

@@ -230,10 +230,12 @@ func GetOpenSea() {
 	log.Println("一级公海 更新结束。")
 	//
 	twoA, twoB, twoC, twoD := TwoOpenSea(oneClassA, oneClassB, oneClassC)
+
 	AddOpenSea(twoA, 2, "A")
 	AddOpenSea(twoB, 2, "B")
 	AddOpenSea(twoC, 2, "C")
 	AddOpenSea(twoD, 2, "D")
+
 	//三级公海
 	ThreeOpenSea(oneClassA, oneClassB, oneClassC, twoA, twoB, twoC, twoD)
 	// AddOpenSea(three, 3, "D")
@@ -250,6 +252,24 @@ func GetOpenSea() {
 	D.1天≤最近30天内活跃天数<5天的客户;
 */
 func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cMap, dMap map[string]bool) {
+
+	clubIdMap := func() map[string]string {
+		c := 0
+		mp := map[string]string{}
+		config.JianyuSubjectdb.SelectByBath(500, func(l *[]map[string]interface{}) bool {
+			for _, v := range *l {
+				c++
+				if c%5000 == 0 {
+					log.Println("加载clue_info数据:", c)
+				}
+				mp[gconv.String(v["userid"])] = gconv.String(v["id"])
+			}
+			return true
+		}, `SELECT id,userid FROM dwd_f_crm_clue_info WHERE userid !=""`)
+		return mp
+	}()
+	log.Println("clue_info数据加载结束:", len(clubIdMap))
+	//
 	i_day := g.Cfg().MustGet(ctx, "classTwoHighSeaslastDay").Int()
 	day := time.Now().AddDate(0, 0, -i_day).Format(date.Date_Full_Layout)
 	q := fmt.Sprintf("SELECT userid,COUNT(1) as count FROM dwd_f_userbase_visit_info WHERE  createtime > '%s' AND contentnum >0 GROUP BY userid ", day)
@@ -276,6 +296,7 @@ func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cM
 	bMap = map[string]bool{}
 	cMap = map[string]bool{}
 	dMap = map[string]bool{}
+	count := 0
 	for rows.Next() {
 		var userid string
 		var ct int
@@ -283,7 +304,15 @@ func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cM
 		if err != nil {
 			log.Println("row scan err:", err)
 		}
-		clubId := GetClueIdByUserId(userid)
+		count++
+		if count%5000 == 0 {
+			log.Println("二级公海查询:", count)
+		}
+		// clubId := GetClueIdByUserId(userid)
+		clubId := clubIdMap[userid]
+		if clubId == "" {
+			continue
+		}
 		if oneClassA[clubId] || oneClassB[clubId] || oneClassC[clubId] {
 			continue
 		}
@@ -312,21 +341,48 @@ func TwoOpenSea(oneClassA, oneClassB, oneClassC map[string]bool) (aMap, bMap, cM
 // 三级公海:
 // 最近30天内活跃天数=0天的客户。
 func ThreeOpenSea(oneA, oneB, oneC, twoA, twoB, twoC, twoD map[string]bool) map[string]bool {
+	log.Println("三级公海start")
 	m := map[string]bool{}
 	ctx := gctx.New()
 	t := time.Now()
 	t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local)
 	day := g.Cfg().MustGet(ctx, "classThreeHighSeaslastDay").Int()
 	t = t.AddDate(0, 0, -day)
+	count := 0
+	visitMap := func() map[string]bool {
+		c := 0
+		mp := map[string]bool{}
+		config.JianyuSubjectdb.SelectByBath(500, func(l *[]map[string]interface{}) bool {
+			for _, v := range *l {
+				c++
+				if c%5000 == 0 {
+					log.Println("加载dwd_f_userbase_visit_info数据:", c)
+				}
+				mp[gconv.String(v["userid"])] = true
+			}
+			return true
+		}, `SELECT COUNT(1),userid FROM dwd_f_userbase_visit_info WHERE  createtime > ? AND contentnum >0 GROUP BY userid`, t.Format(date.Date_Full_Layout))
+		return mp
+	}()
+	log.Println("dwd_f_userbase_visit_info数据加载结束:", len(visitMap))
 
 	config.JianyuSubjectdb.SelectByBath(500, func(l *[]map[string]interface{}) bool {
 		ids := []interface{}{}
 
 		for _, v := range *l {
+			count++
+			if count%5000 == 0 {
+				log.Println("三级公海加载:", count)
+			}
 			clubId := gconv.String(v["clue_id"])
 			userid := gconv.String(v["userid"])
-			q := fmt.Sprintf("SELECT count(1)  FROM dwd_f_userbase_visit_info WHERE  createtime > '%s' AND contentnum =0 and userid = '%s'", t.Format(date.Date_Full_Layout), userid)
-			config.JianyuSubjectdb.SelectBySql(q)
+			// q := fmt.Sprintf("SELECT count(1)  FROM dwd_f_userbase_visit_info WHERE  createtime > '%s' AND contentnum >0 and userid = '%s'", t.Format(date.Date_Full_Layout), userid)
+			// if config.JianyuSubjectdb.CountBySql(q) > 0 {
+			// 	continue
+			// }
+			if visitMap[userid] {
+				continue
+			}
 			if oneA[clubId] || oneB[clubId] || oneC[clubId] || twoA[clubId] || twoB[clubId] || twoC[clubId] || twoD[clubId] {
 				continue
 			}
@@ -341,6 +397,7 @@ func ThreeOpenSea(oneA, oneB, oneC, twoA, twoB, twoC, twoD map[string]bool) map[
 		config.JianyuSubjectdb.UpdateOrDeleteBySql(`UPDATE dwd_f_crm_open_sea SET level = 3,clue_level='D' WHERE clue_id in (`+wh+`)`, ids...)
 		return true
 	}, `SELECT a.clue_id,b.userid FROM dwd_f_crm_open_sea a LEFT JOIN dwd_f_crm_clue_info b ON a.clue_id=b.id`)
+	log.Println("三级公海end")
 	return m
 }
 
@@ -436,7 +493,7 @@ func ReturnOpenSea() {
 	} {
 		sql := `SELECT a.clue_id,a.position_id,a.seatNumber FROM dwd_f_crm_private_sea a 
 				LEFT JOIN dwd_f_crm_clue_info b ON a.clue_id=b.id 
-				WHERE b.trailstatus =?`
+				WHERE a.position_id is not null and b.trailstatus =?`
 		argsSelect := []interface{}{trailstatus}
 
 		intime := ""
@@ -471,7 +528,6 @@ func ReturnOpenSea() {
 					args2 = append(args2, intime)
 				}
 				//保留未跟进线索
-				log.Println("====<>", config.JianyuSubjectdb.CountBySql(sql2, args2...))
 				if config.JianyuSubjectdb.CountBySql(sql2, args2...) > 0 {
 					log.Println("不满足线索过滤", id)
 					continue

+ 129 - 121
telemarketingEtl/entity/dwd_f_userbase_event_info.go

@@ -18,7 +18,6 @@ import (
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gtime"
 	"github.com/gogf/gf/v2/util/gconv"
-	"go.mongodb.org/mongo-driver/mongo/options"
 )
 
 type EventInfo struct {
@@ -126,138 +125,135 @@ func EventInfoAdd(start, end int64) {
 	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
 	index := g.Cfg().MustGet(ctx, "index").Int()
 	log.Println("analy tables:", tables)
-
-	th_mgo := util.NewThreads(len(tables))
+	query := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": util.MongoId(start),
+			"$lt":  util.MongoId(end),
+		},
+	}
 	for _, table := range tables {
-		th_mgo.Open()
-		go func(table string) {
-			defer th_mgo.Close()
-			of := options.Find()
-			of.SetProjection(g.Map{"url": 1, "userid": 1, "hour": 1, "month": 1, "day": 1, "date": 1, "client": 1, "refer": 1})
-			coll := config.MgoLog.C.Database(config.MgoLog.DbName).Collection(table)
-			cur, err := coll.Find(ctx, g.Map{"_id": g.Map{"$gte": util.MongoId(start), "$lt": util.MongoId(end)}}, of)
-			if err == nil && cur.Err() == nil {
-				n := 0
-				for ; cur.Next(ctx); n++ {
-					v := g.Map{}
-					er := cur.Decode(&v)
-					if er != nil {
-						continue
-					}
-					userid := gconv.String(v["userid"])
-					if userid == "" {
-						continue
-					}
-					url_ := gconv.String(v["url"])
-					if !filter(url_) {
-						continue
-					}
-					platform := APP
-					if table == "jy_logs" {
-						platform = PC
-						if client := gconv.String(v["client"]); client != "" {
-							if regWx.MatchString(client) {
-								platform = WX
-							}
+		func(table string) {
+			session := config.MgoLog.GetMgoConn()
+			iter := session.DB("qfw").C(table).Find(query).Iter()
+			count := 0
+			for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+				userid := gconv.String(thisData["userid"])
+				if userid == "" {
+					continue
+				}
+				url_ := gconv.String(thisData["url"])
+				if !filter(url_) {
+					continue
+				}
+				platform := APP
+				if table == "jy_logs" {
+					platform = PC
+					if client := gconv.String(thisData["client"]); client != "" {
+						if regWx.MatchString(client) {
+							platform = WX
 						}
 					}
-					//职位id转换userid
-					if !mongodb.IsObjectIdHex(userid) {
-						userid = GetUserIdByPositionId(userid)
-					}
-					refer := gconv.String(v["refer"]) //用于取分客户监控,客户监控的url地址和采购单位画像是一个
-					eventtype := ""
-					name := ""
-					//根据不同url地址获取 标讯的公告名称、采购单位名称、企业名称
-					if PcArticleReg.MatchString(url_) || AppArticleReg.MatchString(url_) { //三级页根据加密id获取公告id
-						eventtype = INFO
-						//截取id
-						ur := ArticleId.FindStringSubmatch(url_)
-						if len(ur) > 1 {
-							uarr := encrypt.BDecodeArticleId2ByCheck(ur[1], encrypt.SE, encrypt.SE2)
-							if len(uarr) > 0 {
-								biddingId := uarr[0]
-								//获取标讯标题
-								bdata, ok := config.MgoBid.FindById("bidding", biddingId, `{"title":1}`)
-								if ok && bdata != nil && len(*bdata) > 0 {
-									name = gconv.String((*bdata)["title"])
+				}
+				//职位id转换userid
+				if !mongodb.IsObjectIdHex(userid) {
+					userid = GetUserIdByPositionId(userid)
+				}
+				if userid == "" {
+					continue
+				}
+				refer := gconv.String(thisData["refer"]) //用于取分客户监控,客户监控的url地址和采购单位画像是一个
+				eventtype := ""
+				name := ""
+				//根据不同url地址获取 标讯的公告名称、采购单位名称、企业名称
+				if PcArticleReg.MatchString(url_) || AppArticleReg.MatchString(url_) { //三级页根据加密id获取公告id
+					eventtype = INFO
+					//截取id
+					ur := ArticleId.FindStringSubmatch(url_)
+					if len(ur) > 1 {
+						uarr := BDecodeArticleId2ByCheck(ur[1], encrypt.SE, encrypt.SE2)
+						if len(uarr) > 0 {
+							biddingId := uarr[0]
+							//获取标讯标题
+							bdata, ok := config.MgoBid.FindById("bidding", biddingId, `{"title":1}`)
+							if ok && bdata != nil && len(*bdata) > 0 {
+								name = gconv.String((*bdata)["title"])
+								if name == "" {
+									continue
 								}
 							}
 						}
-					} else if pcBuyPortraitReg.MatchString(url_) && !strings.Contains(refer, "/swordfish/page_big_pc/my_client") {
-						eventtype = BUYPORTRAIT
-						na := PcBuyPortraitNameReg.FindStringSubmatch(url_)
-						if len(na) > 1 {
-							name = util.Unescape(na[1])
-						}
-					} else if AppBuyPortraitReg.MatchString(url_) {
-						eventtype = BUYPORTRAIT
-						na := AppBuyPortraitNameReg.FindStringSubmatch(url_)
-						if len(na) > 1 {
-							name = util.Unescape(na[1])
-						}
-					} else if PcEntIntelligentceMonitorReg.MatchString(url_) && !strings.Contains(refer, "/swordfish/page_big_pc/free/ent_follow") {
-						eventtype = ENTPORTRAIT
-						na := PcEntPortraitNameReg.FindStringSubmatch(url_)
-						if len(na) > 1 {
-							entId := util.DecodeId(na[1])
-							name = GetEntNameByEntId(entId)
-						}
-					} else if AppEntPortraitReg.MatchString(url_) {
-						eventtype = ENTPORTRAIT
-						//app端作为参数拼接过来的。和pc不一样
-						u, err := url.Parse(url_)
-						if err == nil {
-							eId := u.Query().Get("eId")
-							entId := util.DecodeId(eId)
-							name = GetEntNameByEntId(entId)
-						}
-					} else if ProductIndexReg.MatchString(url_) {
-						eventtype = PRODUCTINDEX
-					} else if PcClientReg.MatchString(url_) && strings.Contains(refer, "/swordfish/page_big_pc/my_client") {
-						eventtype = CLIENT
-						name = PcClient(url_)
-					} else if AppClientReg.MatchString(url_) {
-						eventtype = CLIENT
-						name = AppClient(url_)
-					} else if PcProjectMonitorReg.MatchString(url_) {
-						eventtype = PROJECTPROGRESS
-						name = ProjectProgress(url_)
-					} else if PcEntIntelligentceMonitorReg.MatchString(url_) && strings.Contains(refer, "/swordfish/page_big_pc/free/ent_follow") {
-						eventtype = ENTFOLLOW
-						name = EntFollow(url_)
-					} else if PcForecastReg.MatchString(url_) || AppForecastReg.MatchString(url_) {
-						eventtype = FORECAST
-						name, _ = Forecast(url_)
-					} else if PcEntSearchReq.MatchString(url_) {
-						eventtype = ENTSEARCH
 					}
-
-					createtime := time.Unix(gconv.Int64(v["date"]), 0).Format(date.Date_Full_Layout)
-					values = append(values, userid, eventtype, name, url_, platform, createtime)
-					if n%index == 0 {
-						if len(values) > 0 {
-							id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
-							if id1 <= 0 {
-								log.Println(values, "失败")
-							}
-						}
-						values = []interface{}{}
+				} else if pcBuyPortraitReg.MatchString(url_) && !strings.Contains(refer, "/swordfish/page_big_pc/my_client") {
+					eventtype = BUYPORTRAIT
+					na := PcBuyPortraitNameReg.FindStringSubmatch(url_)
+					if len(na) > 1 {
+						name = util.Unescape(na[1])
 					}
-					if n%200 == 0 {
-						log.Println("current", table, n)
+				} else if AppBuyPortraitReg.MatchString(url_) {
+					eventtype = BUYPORTRAIT
+					na := AppBuyPortraitNameReg.FindStringSubmatch(url_)
+					if len(na) > 1 {
+						name = util.Unescape(na[1])
 					}
+				} else if PcEntIntelligentceMonitorReg.MatchString(url_) && !strings.Contains(refer, "/swordfish/page_big_pc/free/ent_follow") {
+					eventtype = ENTPORTRAIT
+					na := PcEntPortraitNameReg.FindStringSubmatch(url_)
+					if len(na) > 1 {
+						entId := util.DecodeId(na[1])
+						name = GetEntNameByEntId(entId)
+					}
+				} else if AppEntPortraitReg.MatchString(url_) {
+					eventtype = ENTPORTRAIT
+					//app端作为参数拼接过来的。和pc不一样
+					u, err := url.Parse(url_)
+					if err == nil {
+						eId := u.Query().Get("eId")
+						entId := util.DecodeId(eId)
+						name = GetEntNameByEntId(entId)
+					}
+				} else if ProductIndexReg.MatchString(url_) {
+					eventtype = PRODUCTINDEX
+				} else if PcClientReg.MatchString(url_) && strings.Contains(refer, "/swordfish/page_big_pc/my_client") {
+					eventtype = CLIENT
+					name = PcClient(url_)
+				} else if AppClientReg.MatchString(url_) {
+					eventtype = CLIENT
+					name = AppClient(url_)
+				} else if PcProjectMonitorReg.MatchString(url_) {
+					eventtype = PROJECTPROGRESS
+					name = ProjectProgress(url_)
+				} else if PcEntIntelligentceMonitorReg.MatchString(url_) && strings.Contains(refer, "/swordfish/page_big_pc/free/ent_follow") {
+					eventtype = ENTFOLLOW
+					name = EntFollow(url_)
+				} else if PcForecastReg.MatchString(url_) || AppForecastReg.MatchString(url_) {
+					eventtype = FORECAST
+					name, _ = Forecast(url_)
+				} else if PcEntSearchReq.MatchString(url_) {
+					eventtype = ENTSEARCH
 				}
-				//
-				if len(values) > 0 {
-					id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
-					if id1 <= 0 {
-						log.Println(values, "失败~")
+				createtime := time.Unix(gconv.Int64(thisData["date"]), 0).Format(date.Date_Full_Layout)
+				values = append(values, userid, eventtype, name, url_, platform, createtime)
+				count++
+				if count%index == 0 {
+					if len(values) > 0 {
+						id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
+						if id1 <= 0 {
+							log.Println(values, "失败")
+						}
+						values = []interface{}{}
 					}
-					values = []interface{}{}
 				}
-			} else {
-				log.Println(err)
+				if count%50000 == 0 {
+					log.Println("current", table, count)
+				}
+			}
+			//
+			if len(values) > 0 {
+				id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
+				if id1 <= 0 {
+					log.Println(values, "失败~")
+				}
+				values = []interface{}{}
 			}
 		}(table)
 	}
@@ -417,3 +413,15 @@ func GetInfoByBid(bid string) (string, string) {
 
 	return title, href
 }
+
+//短地址解密,二次解密带校验和
+func BDecodeArticleId2ByCheck(id string, s1, s2 *encrypt.SimpleEncrypt) []string {
+	if !strings.Contains(id, "+") { //新加密算法解密
+		id, _ = url.QueryUnescape(id)
+	}
+	if len(id) > 2 {
+		kstr := s2.DecodeStringByCheck(id[3:])
+		return strings.Split(s1.DecodeStringByCheck(kstr), ",")
+	}
+	return []string{}
+}

+ 30 - 22
telemarketingEtl/entity/dwd_f_userbase_search_info.go

@@ -9,6 +9,7 @@ import (
 	"telemarketingEtl/util"
 	"time"
 
+	"app.yhyue.com/moapp/jybase/common"
 	"app.yhyue.com/moapp/jybase/date"
 	"app.yhyue.com/moapp/jybase/mongodb"
 	"github.com/gogf/gf/v2/frame/g"
@@ -84,6 +85,7 @@ func BuyerClassCode() map[string]string {
 func SearchInfoAdd(start, end int64) {
 	s_id := util.GetObjectId(start)
 	e_id := util.GetObjectId(end)
+	index := g.Cfg().MustGet(ctx, "index").Int()
 	query := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gte": s_id,
@@ -100,14 +102,11 @@ func SearchInfoAdd(start, end int64) {
 	session := config.MgoLog.GetMgoConn()
 	iter := session.DB("qfw").C("jy_search_log").Find(query).Iter()
 	count := 0
+	values := []interface{}{}
+	fieids := []string{"userid", "search_word", "exclude_word", "search_area", "search_model", "matchtype", "search_industry", "max_price", "min_price", "search_publishtime_start", "search_publishtime_end", "search_type", "filetext", "search_buyerclass", "platform", "search_time"}
 	for thisData := map[string]interface{}{}; iter.Next(&thisData); {
-		sPool <- true
-		sWait.Add(1)
-		go func(thisData map[string]interface{}) {
-			defer func() {
-				<-sPool
-				sWait.Done()
-			}()
+		func(thisData map[string]interface{}) {
+
 			userid := gconv.String(thisData["s_userid"])
 			if userid == "" {
 				return
@@ -207,29 +206,38 @@ func SearchInfoAdd(start, end int64) {
 			}
 			//平台
 			platform := gconv.String(thisData["platform"])
-			log.Println(createtime)
+			values = append(values, userid, search_word, exclude_word, area, search_mode, matchtype, search_industry, max_price, min_price, common.If(search_publishtime_start == "", nil, search_publishtime_start), common.If(search_publishtime_end == "", nil, search_publishtime_end), subType, filetext, buyerclass, platform, search_time)
 			//存库
-			sql := `INSERT INTO dwd_f_userbase_search_info 
-												(userid,search_word, exclude_word, search_area,search_model,matchtype,
-												search_industry,max_price, min_price,
-												search_publishtime_start,search_publishtime_end,search_type,
-												filetext,search_buyerclass, platform,search_time)
-												VALUES 
-												(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
-			id := config.JianyuSubjectdb.InsertBySql(sql, userid, search_word, exclude_word, area, search_mode, matchtype,
-				search_industry, max_price, min_price, search_publishtime_start, search_publishtime_end, subType, filetext, buyerclass, platform, search_time)
-			if id <= 0 {
-				log.Println("插入失败:", userid, search_word, exclude_word, area, search_mode, matchtype,
-					search_industry, max_price, min_price, search_publishtime_start, search_publishtime_end, subType, filetext, buyerclass, platform, search_time)
+			// sql := `INSERT INTO dwd_f_userbase_search_info
+			// 									(userid,search_word, exclude_word, search_area,search_model,matchtype,
+			// 									search_industry,max_price, min_price,
+			// 									search_publishtime_start,search_publishtime_end,search_type,
+			// 									filetext,search_buyerclass, platform,search_time)
+			// 									VALUES
+			// 									(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
+			// id := config.JianyuSubjectdb.InsertBySql(sql, userid, search_word, exclude_word, area, search_mode, matchtype,
+			// 	search_industry, max_price, min_price, search_publishtime_start, search_publishtime_end, subType, filetext, buyerclass, platform, search_time)
+			// if id <= 0 {
+			// 	log.Println("插入失败:", userid, search_word, exclude_word, area, search_mode, matchtype,
+			// 		search_industry, max_price, min_price, search_publishtime_start, search_publishtime_end, subType, filetext, buyerclass, platform, search_time)
+			// }
+			if count%index == 0 {
+				if len(values) > 0 {
+					config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_search_info", fieids, values)
+					values = []interface{}{}
+				}
 			}
 		}(thisData)
 		count++
-		if count%100 == 0 {
+		if count%5000 == 0 {
 			log.Printf("已完成%d条数据\n", count)
 		}
 		thisData = map[string]interface{}{}
 	}
-	sWait.Wait()
+	if len(values) > 0 {
+		config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_search_info", fieids, values)
+		values = []interface{}{}
+	}
 
 	log.Println("end")
 }

+ 11 - 10
telemarketingEtl/entity/dwd_f_userbase_visit_info.go

@@ -49,18 +49,19 @@ func VisitInfoAdd(start, end int64) {
 	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
 	log.Println("analy tables:", tables)
 	for _, table := range tables {
-		go func(table string) {
+		/*go*/ func(table string) {
 			session := config.MgoLog.GetMgoConn()
 			iter := session.DB("qfw").C(table).Find(query).Iter()
 			count := 0
 			for thisData := map[string]interface{}{}; iter.Next(&thisData); {
-				sPool <- true
-				sWait.Add(1)
-				go func(thisData map[string]interface{}) {
-					defer func() {
-						<-sPool
-						sWait.Done()
-					}()
+				// sPool <- true
+				// sWait.Add(1)
+				/*go*/
+				func(thisData map[string]interface{}) {
+					// defer func() {
+					// 	<-sPool
+					// 	sWait.Done()
+					// }()
 					userid := gconv.String(thisData["userid"])
 					if userid == "" {
 						return
@@ -105,13 +106,13 @@ func VisitInfoAdd(start, end int64) {
 					//
 				}(thisData)
 				count++
-				if count%100 == 0 {
+				if count%5000 == 0 {
 					log.Printf("%s已完成%d条数据\n", table, count)
 				}
 				thisData = map[string]interface{}{}
 			}
 			log.Println("end!")
-			sWait.Wait()
+			// sWait.Wait()
 		}(table)
 	}
 }

+ 162 - 0
telemarketingEtl/entity/old.go

@@ -0,0 +1,162 @@
+package entity
+
+import (
+	"log"
+	"regexp"
+	"telemarketingEtl/config"
+	"telemarketingEtl/util"
+	"time"
+
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gtime"
+	"github.com/gogf/gf/v2/util/gconv"
+)
+
+// 查看事件
+func VisitInfoAddOld(start, end int64) {
+
+	// 获取时间戳 1 和时间戳 2 所在的日期
+	t1 := time.Unix(start, 0)
+	t2 := time.Unix(end, 0)
+
+	index := g.Cfg().MustGet(ctx, "index").Int()
+
+	RegWx, _ := regexp.Compile("MicroMessenger")
+
+	// 获取时间戳 1 和时间戳 2 之间的天数
+	days := int(t2.Sub(t1).Hours() / 24)
+
+	for i := 0; i <= days; i++ {
+		// 获取当前日期
+		t := t1.AddDate(0, 0, i)
+
+		// 获取当天的开始时间
+		startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
+
+		// 获取当天的结束时间
+		endOfDay := time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, t.Location()).Unix()
+
+		s_id := util.GetObjectId(startOfDay)
+		e_id := util.GetObjectId(endOfDay)
+		query := map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gte": s_id,
+				"$lte": e_id,
+			},
+		}
+		log.Println("task run", time.Unix(startOfDay, 0).Format(date.Date_Full_Layout), time.Unix(endOfDay, 0).Format(date.Date_Full_Layout), gtime.NewFromTimeStamp(start), query)
+		tables := g.Cfg().MustGet(ctx, "logTables").Strings()
+		log.Println("analy tables:", tables)
+		userVisit := map[string]*visitStruct{}
+		for _, table := range tables {
+			func(table string) {
+				session := config.MgoLog.GetMgoConn()
+				iter := session.DB("qfw").C(table).Find(query).Iter()
+				count := 0
+				for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+					func(thisData map[string]interface{}) {
+						userid := gconv.String(thisData["userid"])
+						if userid == "" {
+							return
+						}
+						if !mongodb.IsObjectIdHex(userid) {
+							userid = GetUserIdByPositionId(userid)
+						}
+						if userid == "" {
+							return
+						}
+						url_ := gconv.String(thisData["url"])
+						reg := regexp.MustCompile(".*article/content/(.*)\\.html")
+						contentnum := 0
+						if reg.MatchString(url_) {
+							contentnum = 1
+						}
+
+						createtime := gconv.Int64(thisData["date"])
+
+						craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
+						platform := APP
+						if table == "jy_logs" {
+							platform = PC
+							if client := gconv.String(thisData["client"]); client != "" {
+								if RegWx.MatchString(client) {
+									platform = WX
+								}
+							}
+						}
+						userVisit[userid] = updateUser(userVisit[userid], 1, contentnum, platform, userid, craetetimeStr, craetetimeStr)
+
+						//
+					}(thisData)
+					count++
+					if count%5000 == 0 {
+						log.Printf("%s已完成%d条数据\n", table, count)
+					}
+					thisData = map[string]interface{}{}
+				}
+				log.Println("end!")
+				// sWait.Wait()
+			}(table)
+		}
+		fieids := []string{"userid", "DATE", "number", "platform", "createtime", "contentnum"}
+		args := []interface{}{}
+		ii := 0
+		if len(userVisit) > 0 {
+			log.Println("用户量:", len(userVisit))
+			for _, v := range userVisit {
+				userid := (*v).userid
+				contentnum := (*v).contentnum
+				number := (*v).number
+				createtime := (*v).createtime
+				datetime := (*v).datetime
+				platform := (*v).platform
+				args = append(args, userid, datetime, number, platform, createtime, contentnum)
+				ii++
+				if ii%index == 0 {
+					if len(args) > 0 {
+						config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
+						args = []interface{}{}
+					}
+				}
+			}
+			if len(args) > 0 {
+				config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
+			}
+		}
+
+	}
+}
+
+type visitStruct struct {
+	userid     string
+	createtime string //创建时间
+	datetime   string //访问时间
+	platform   int
+	number     int
+	contentnum int
+}
+
+func updateUser(visit *visitStruct, num, contmun, platform int, userid, createtime, datetime string) *visitStruct {
+	if visit == nil {
+		return &visitStruct{
+			userid:     userid,
+			createtime: createtime,
+			datetime:   datetime,
+			number:     num,
+			contentnum: contmun,
+			platform:   platform,
+		}
+	} else {
+		if num > 0 {
+			visit.number = visit.number + num
+		}
+		if contmun > 0 {
+			visit.contentnum = visit.contentnum + contmun
+		}
+		visit.platform = platform
+		visit.datetime = datetime
+	}
+	return visit
+}

+ 1 - 2
telemarketingEtl/main.go

@@ -3,13 +3,12 @@ package main
 import (
 	_ "telemarketingEtl/config"
 	"telemarketingEtl/timetask"
-	"time"
 )
 
 func main() {
 
 	//定时任务开始
 	timetask.Run()
+	select {}
 
-	time.Sleep(999999 * time.Hour)
 }

+ 15 - 0
telemarketingEtl/timetask/task.go

@@ -26,6 +26,21 @@ deleteOpenSeaTime: 5
 #自动退出
 returnOpenSeaTime: 5
 */
+
+func today() int64 {
+	now := time.Now()
+	today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+	return today.Unix()
+}
+
+func GetStartEnd() (start, end int64) {
+	now := time.Now()
+	today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+	start = today.AddDate(0, 0, -1).Unix()
+	end = today.Unix()
+	return
+}
+
 func TaskEventInfo() {
 	log.Println("开始TaskEventInfo")
 	now := time.Now()