浏览代码

wip : etl

zhangxinlei1996 2 年之前
父节点
当前提交
b25106706b

+ 12 - 10
telemarketingEtl/config.yaml

@@ -20,11 +20,13 @@ mongo:
   ent:
     dbName: mixdata
     size: 50
-    address: 192.168.3.206:27001
+    address: 192.168.3.206:27002
+    collection: qyxy
+    collectionChange: qyxy_change
     userName: jyDevGroup
-    password: DevGroup
+    password: jy@DevGroup
   bidding:
-    address: 192.168.3.207:27001,192.168.3.206:27002
+    address: 192.168.3.206:27002
     size: 50
     dbName: qfw_data
     collection: bidding
@@ -34,18 +36,18 @@ mongo:
 logTables:
   - jy_logs
   - jyapp_logs
-th_u: 2
-index: 500
-poolSize: 2
+th_u: 4
+index: 1
+poolSize: 50
 regex:
   - "^/article/(content|bdprivate|mailprivate):pcArticle" #pc、wx三级页
   - "^/jyapp/article/(content|bdprivate|mailprivate):appArticle" #app三级页
   #- "^(/supsearch/index.html|/front/pcAjaxReq|/jyapi/jybx/core/.Type/searchList):search" #pc搜索
-  - "^/swordfish/page_big_pc/unit_portrayal/:pcBuyPortrait" #pc采购单位话想
-  - "^/jyapp/big/page/unit_portrayal/:appBuyPortrait" #app采购单位画像
-  - "^/jyapp/big/page/ent_portrait/:appEntPortrait" #app企业画像
+  - "^/swordfish/page_big_pc/unit_portrayal:pcBuyPortrait" #pc采购单位话想
+  - "^/jyapp/big/page/unit_portrayal:appBuyPortrait" #app采购单位画像
+  - "^/jyapp/big/page/ent_portrait:appEntPortrait" #app企业画像
   - "^(/swordfish/page_big_pc/free/project_progress)|(/page_big_pc/pro_follow_detail):pcProjectMonitor" #pc项目进度监控
-  - "^(/swordfish/page_big_pc/free/ent_follow)|(/swordfish/page_big_pc/ent_portrait):pcEntIntelligentceMonitor" #pc企业情报监控
+  - "^(/swordfish/page_big_pc/free/ent_follow)|(/swordfish/page_big_pc/ent_portrait):pcEntIntelligentceMonitor" #pc企业情报监控/jyapp/big/page/ent_portrait
   - "^/swordfish/page_big_pc/my_client:pcClient"#pc-客户监控
   - "^/jyapp/big/page/client_list:appClient"#app-客户监控
   - "^/swordfish/page_big_pc/forecast_list:pcForecast"#pc-潜在项目预测

+ 1 - 1
telemarketingEtl/config/config.go

@@ -21,7 +21,7 @@ func init() {
 	g.Cfg().GetAdapter().(*gcfg.AdapterFile).SetFileName("config.yaml")
 	ctx := gctx.New()
 	//初始化
-	if g.Cfg().MustGet(ctx, "tidb.jianyuSubjectdb.dbname").String() != "" {
+	if g.Cfg().MustGet(ctx, "tidb.jianyuSubjectdb.dbName").String() != "" {
 		log.Println("初始化tidb")
 		JianyuSubjectdb = &mysql.Mysql{
 			Address:      g.Cfg().MustGet(ctx, "tidb.jianyuSubjectdb.address").String(),

+ 126 - 9
telemarketingEtl/entity/dwd_f_userbase_event_info.go

@@ -3,12 +3,15 @@ package entity
 import "C"
 import (
 	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/encrypt"
+	"app.yhyue.com/moapp/jybase/mongodb"
 	"context"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gtime"
 	"github.com/gogf/gf/v2/util/gconv"
 	"go.mongodb.org/mongo-driver/mongo/options"
 	"log"
+	"net/url"
 	"regexp"
 	"strings"
 	"sync"
@@ -36,6 +39,11 @@ var (
 	PcEntIntelligentceMonitorReg, PcClientReg, AppClientReg,
 	PcForecastReg, AppForecastReg, PcCollectionReg, AppCollectionReg,
 	WxCollectionReg, ProductIndexReg *regexp.Regexp
+	ArticleId             = regexp.MustCompile(".*article/content/(.*)\\.html")
+	PcBuyPortraitNameReg  = regexp.MustCompile(".*swordfish/page_big_pc/unit_portrayal/(.*)")
+	AppBuyPortraitNameReg = regexp.MustCompile(".*jyapp/big/page/unit_portrayal\\?entName=(.*)")
+	PcEntPortraitNameReg  = regexp.MustCompile(".*/swordfish/page_big_pc/ent_portrait/(.*)")
+	AppEntPortraitNameReg = regexp.MustCompile(".*/jyapp/big/page/ent_portrait\\?eId=(.*)")
 )
 
 func init() {
@@ -55,8 +63,8 @@ func loading() {
 				PcArticleReg = req
 			case "appArticle": //app三级页
 				AppArticleReg = req
-			case "pcBuyPortrait": //pc采购单位话想
-				PcArticleReg = req
+			case "pcBuyPortrait": //pc采购单位画像
+				pcBuyPortraitReg = req
 			case "appBuyPortrait": //app采购单位画像
 				AppBuyPortraitReg = req
 			case "appEntPortrait": //app企业画像
@@ -86,6 +94,20 @@ func loading() {
 	}
 }
 
+func filter(url string) bool {
+	regexArr := g.Cfg().MustGet(ctx, "regex", "").Strings()
+	for _, v := range regexArr {
+		av := strings.Split(v, ":")
+		if len(av) == 2 {
+			req := regexp.MustCompile(av[0])
+			if req.MatchString(url) {
+				return true
+			}
+		}
+	}
+	return false
+}
+
 // 查看事件
 func EventInfoAdd(start, end int64) {
 	//
@@ -98,9 +120,10 @@ func EventInfoAdd(start, end int64) {
 
 	regWx, _ := regexp.Compile("MicroMessenger")
 
-	tables := g.Cfg().MustGet(ctx, "log_tables").Strings()
+	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
 	index := g.Cfg().MustGet(ctx, "index").Int()
 	log.Println("analy tables:", tables)
+
 	th_mgo := util.NewThreads(len(tables))
 	for _, table := range tables {
 		th_mgo.Open()
@@ -118,6 +141,14 @@ func EventInfoAdd(start, end int64) {
 					if er != nil {
 						continue
 					}
+					userid := gconv.String(v["userid"])
+					if userid == "" {
+						continue
+					}
+					url_ := gconv.String(v["url"])
+					if !filter(url_) {
+						continue
+					}
 					platform := APP
 					if table == "jy_logs" {
 						platform = PC
@@ -127,13 +158,59 @@ func EventInfoAdd(start, end int64) {
 							}
 						}
 					}
-					userid := gconv.String(v["userid"])
-					url_ := gconv.String(v["url"])
+					//职位id转换userid
+					if !mongodb.IsObjectIdHex(userid) {
+						userid = GetUserIdByPositionId(userid)
+					}
 					eventtype := ""
 					name := ""
+					//根据不同url地址获取 标讯的公告名称、采购单位名称、企业名称
+					if PcArticleReg.MatchString(url_) || AppArticleReg.MatchString(url_) { //三级页根据加密id获取公告id
+						eventtype = INFO
+						//截取id
+						ur := ArticleId.FindStringSubmatch(url_)
+						if len(ur) > 1 {
+							uarr := encrypt.BDecodeArticleId2ByCheck(ur[1], encrypt.SE, encrypt.SE2)
+							if len(uarr) > 0 {
+								biddingId := uarr[0]
+								//获取标讯标题
+								bdata, ok := config.MgoBid.FindById("bidding", biddingId, `{"title":1}`)
+								if ok && bdata != nil && len(*bdata) > 0 {
+									name = gconv.String((*bdata)["title"])
+								}
+							}
+						}
+					} else if pcBuyPortraitReg.MatchString(url_) {
+						eventtype = BUYPORTRAIT
+						na := PcBuyPortraitNameReg.FindStringSubmatch(url_)
+						if len(na) > 1 {
+							name = util.Unescape(na[1])
+						}
+					} else if AppBuyPortraitReg.MatchString(url_) {
+						eventtype = BUYPORTRAIT
+						na := AppBuyPortraitNameReg.FindStringSubmatch(url_)
+						if len(na) > 1 {
+							name = util.Unescape(na[1])
+						}
+					} else if PcEntIntelligentceMonitorReg.MatchString(url_) {
+						eventtype = ENTPORTRAIT
+						na := PcEntPortraitNameReg.FindStringSubmatch(url_)
+						if len(na) > 1 {
+							entId := util.DecodeId(na[1])
+							name = GetEntNameByEntId(entId)
+						}
+					} else if AppEntPortraitReg.MatchString(url_) {
+						eventtype = ENTPORTRAIT
+						//app端作为参数拼接过来的。和pc不一样
+						u, err := url.Parse(url_)
+						if err == nil {
+							eId := u.Query().Get("eId")
+							entId := util.DecodeId(eId)
+							name = GetEntNameByEntId(entId)
+						}
+					}
 					createtime := time.Unix(gconv.Int64(v["date"]), 0).Format(date.Date_Full_Layout)
 					values = append(values, userid, eventtype, name, url_, platform, createtime)
-
 					if n%index == 0 {
 						Pool <- true
 						Wait.Add(1)
@@ -143,9 +220,11 @@ func EventInfoAdd(start, end int64) {
 								Wait.Done()
 							}()
 							//插入推送记录表
-							id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
-							if id1 <= 0 {
-								log.Println(values, "失败")
+							if len(values) > 0 {
+								id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
+								if id1 <= 0 {
+									log.Println(values, "失败")
+								}
 							}
 						}(fields, values)
 						values = []interface{}{}
@@ -154,9 +233,47 @@ func EventInfoAdd(start, end int64) {
 						log.Println("current", table, n)
 					}
 				}
+				//
+				if len(values) > 0 {
+					id1, _ := config.JianyuSubjectdb.InsertBatch(DWD_F_USERBASE_EVENT_INFO, fields, values) //id1:数量  id2:开始id索引
+					if id1 <= 0 {
+						log.Println(values, "失败~")
+					}
+				}
 			} else {
 				log.Println(err)
 			}
 		}(table)
 	}
 }
+
+// 根据职位id获取mongodb userid
+func GetUserIdByPositionId(positionId string) (userId string) {
+	if positionId == "" || positionId == "0" {
+		return
+	}
+	i_positionId := gconv.Int(positionId)
+	if i_positionId == 0 {
+		return
+	}
+	data := config.JianyuSubjectdb.SelectBySql(`select userid from dwd_f_userbase_id_mapping where position_id =? limit 1`, i_positionId)
+	if data == nil || len(*data) <= 0 {
+		return
+	}
+	userId = gconv.String((*data)[0]["userid"])
+	return
+}
+
+// 根据企业id获取企业名称
+func GetEntNameByEntId(entId string) (name string) {
+	if entId == "" {
+		return
+	}
+	entinfo, _ := config.MgoEnt.FindOneByField("qyxy_std", map[string]interface{}{"_id": entId}, map[string]interface{}{
+		"company_name": 1, //公司名称
+	})
+	if entinfo == nil || len(*entinfo) == 0 {
+		return
+	}
+	return gconv.String((*entinfo)["company_name"])
+}

+ 294 - 0
telemarketingEtl/entity/dwd_f_userbase_search_info.go

@@ -0,0 +1,294 @@
+package entity
+
+import "C"
+import (
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"log"
+	"strconv"
+	"strings"
+	"sync"
+	"telemarketingEtl/config"
+	"telemarketingEtl/util"
+	"time"
+)
+
+var (
+	sPool chan bool
+	sWait = &sync.WaitGroup{}
+)
+
+func init() {
+	poolSize := g.Cfg().MustGet(ctx, "poolSize").Int()
+	sPool = make(chan bool, poolSize)
+}
+
+// 加载地区代码表
+func AreaCode() map[string]string {
+	areaCodeMap := map[string]string{}
+	data := config.JianyuSubjectdb.SelectBySql(`SELECT code,name FROM d_area_code WHERE level =1`)
+	if data != nil && len(*data) > 0 {
+		for _, v := range *data {
+			areaCodeMap[gconv.String(v["name"])] = gconv.String(v["code"])
+		}
+	}
+	return areaCodeMap
+}
+
+// 加载信息类型代码表
+func SubTypeCode() map[string]string {
+	subTypeCodeMap := map[string]string{}
+	data := config.JianyuSubjectdb.SelectBySql(`SELECT code,pcode,level,name FROM d_topsubtype_code`)
+	if data != nil && len(*data) > 0 {
+		for _, v := range *data {
+			name := gconv.String(v["name"])
+			level := gconv.Int(v["level"])
+			code := gconv.String(v["code"])
+			//拟建重名过滤
+			if name == "拟建" && level == 2 {
+				continue
+			}
+			//统一名称
+			if level == 1 {
+				if name == "预告" {
+					name = "招标预告"
+				} else if name == "招标" {
+					name = "招标公告"
+				} else if name == "结果" {
+					name = "招标结果"
+				} else if name == "其它" {
+					name = "招标信用信息"
+				}
+			}
+			subTypeCodeMap[name] = code
+		}
+	}
+	return subTypeCodeMap
+}
+
+// 加载采购单位代码表
+func BuyerClassCode() map[string]string {
+	buyerClassCodeMap := map[string]string{}
+	data := config.JianyuSubjectdb.SelectBySql(`SELECT code,name FROM d_buyerclass_code`)
+	if data != nil && len(*data) > 0 {
+		for _, v := range *data {
+			buyerClassCodeMap[gconv.String(v["name"])] = gconv.String(v["code"])
+		}
+	}
+	return buyerClassCodeMap
+}
+
+// 历史数据刷库
+func SearchInfoAdd(start, end int64) {
+	s_id := util.GetObjectId(start)
+	e_id := util.GetObjectId(end)
+	query := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": s_id,
+			"$lt":  e_id,
+		},
+	}
+	//加载地区代码表
+	areaCodeMap := AreaCode()
+	//加载信息类型代码表
+	subTypeCodeMap := SubTypeCode()
+	//加载采购单位类型代码表
+	buyerClassCodeMap := BuyerClassCode()
+	//
+	session := config.MgoLog.GetMgoConn()
+	iter := session.DB("qfw").C("jy_search_log").Find(query).Iter()
+	count := 0
+	for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+		sPool <- true
+		sWait.Add(1)
+		go func(thisData map[string]interface{}) {
+			defer func() {
+				<-sPool
+				sWait.Done()
+			}()
+			userid := gconv.String(thisData["s_userid"])
+			if userid == "" {
+				return
+			}
+			if !mongodb.IsObjectIdHex(userid) {
+				userid = GetUserIdByPositionId(userid)
+			}
+			if userid == "" {
+				return
+			}
+			//搜索词
+			search_word := gconv.String(thisData["search_word"])
+			//排除词
+			exclude_word := gconv.String(thisData["exclusionWords"])
+			//地区
+			search_area := gconv.String(thisData["search_area"])
+			area := ""
+			if search_area != "" {
+				arr := strings.Split(search_area, ",")
+				for k, v := range arr {
+					area += areaCodeMap[v]
+					if k != len(arr)-1 {
+						area += ","
+					}
+				}
+			}
+
+			searchMode := gconv.String(thisData["searchMode"]) //搜索日志库里存的 搜索模式:0:精准搜索;1:模糊搜索
+			search_mode := 2
+			if searchMode == "精准搜索" {
+				search_mode = 1 //tidb的搜索信息表存 1-精准查询,2-模糊查询
+			}
+			//行业
+			search_industry := gconv.String(thisData["search_industry"])
+			//价格
+			max_price := int64(0)
+			min_price := int64(0)
+			searchPriceStr, ok := thisData["search_price"].(string)
+			if !ok {
+				min_price, max_price = SearchPriceOld(gconv.Strings(thisData["search_price"]))
+			} else {
+				min_price, max_price = SearchPriceNew(searchPriceStr)
+			}
+			//发布时间开始时间
+			search_publishtime_start := ""
+			//发布时间结束时间
+			search_publishtime_end := ""
+			search_publishtime := gconv.String(thisData["search_publishtime"])
+			createtime := gconv.Int64(thisData["createtime"])
+			search_time := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
+			search_publishtime_start, search_publishtime_end = PublishTime(createtime, search_publishtime)
+			//信息类型
+			search_type := gconv.String(thisData["search_subType"])
+			subType := ""
+			if search_type == "" {
+				arr := strings.Split(search_type, ",")
+				for k, v := range arr {
+					subType += subTypeCodeMap[v]
+					if k != len(arr)-1 {
+						subType += ","
+					}
+				}
+			}
+			//搜索范围 todo 是否附件 标题全文 都得看他 //1-标题,2-全文
+			/*
+				免费 标题(title)  正文(content) 老用户【中标企业(winner)】
+				付费用户 全部(all)、标题(title)  正文(content)  会员: 采购单位(buyer) 中标企业(winner) 招标代理机构(agency) 附件(file)
+				项目名称projectname和标的物purchasing(ppa)
+			*/
+			search_selectType := gconv.String(thisData["search_selectType"])
+			//有无附件
+			filetext := 0
+			matchtype := 1 //1-标题,2-全文,3-全部
+			if strings.Contains(search_selectType, "filetext") {
+				filetext = 1 //附件
+			}
+			//全文
+			if !strings.Contains(search_selectType, "title") && (strings.Contains(search_selectType, "content") || strings.Contains(search_selectType, "detail")) {
+				matchtype = 2
+			}
+			//标题+全文
+			if strings.Contains(search_selectType, "title") && (strings.Contains(search_selectType, "content") || strings.Contains(search_selectType, "detail")) {
+				matchtype = 3
+			}
+
+			//采购单位类型
+			search_buyerclass := gconv.String(thisData["search_buyerClass"])
+			buyerclass := ""
+			if search_buyerclass == "" {
+				arr := strings.Split(search_buyerclass, ",")
+				for k, v := range arr {
+					buyerclass += buyerClassCodeMap[v]
+					if k != len(arr)-1 {
+						buyerclass += ","
+					}
+				}
+			}
+			//平台
+			platform := gconv.String(thisData["platform"])
+			log.Println(createtime)
+			//存库
+			sql := `INSERT INTO dwd_f_userbase_search_info 
+												(userid,search_word, exclude_word, search_area,search_model,matchtype,
+												search_industry,max_price, min_price,
+												search_publishtime_start,search_publishtime_end,search_type,
+												filetext,search_buyerclass, platform,search_time)
+												VALUES 
+												(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
+			id := config.JianyuSubjectdb.InsertBySql(sql, userid, search_word, exclude_word, area, search_mode, matchtype,
+				search_industry, max_price, min_price, search_publishtime_start, search_publishtime_end, subType, filetext, buyerclass, platform, search_time)
+			if id <= 0 {
+				log.Println("插入失败:", userid, search_word, exclude_word, area, search_mode, matchtype,
+					search_industry, max_price, min_price, search_publishtime_start, search_publishtime_end, subType, filetext, buyerclass, platform, search_time)
+			}
+		}(thisData)
+		count++
+		if count%100 == 0 {
+			log.Printf("已完成%d条数据\n", count)
+		}
+		thisData = map[string]interface{}{}
+	}
+	sWait.Wait()
+
+	log.Println("end")
+}
+
+// 价格区间计算处理 新
+func SearchPriceNew(search_price string) (min, max int64) {
+	if search_price == "" || search_price == "-" {
+		return
+	}
+	result := strings.Split(search_price, "-")
+	if len(result) > 1 {
+		return gconv.Int64(result[0]), gconv.Int64(result[1])
+	}
+	return
+}
+
+// 价格区间计算处理 老
+func SearchPriceOld(search_price []string) (min, max int64) {
+	if len(search_price) == 0 {
+		return
+	}
+	result := gconv.Strings(search_price)
+	if len(result) > 1 {
+		return gconv.Int64(result[0]), gconv.Int64(result[1])
+	}
+
+	return
+}
+
+// 发布时间
+// 根据入库时间反推 开始结束时间
+func PublishTime(createtime int64, publishTime string) (startTime, endTime string) {
+	if publishTime == "" {
+		return "", ""
+	}
+	now := time.Unix(createtime, 0)
+	endTime = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Format(date.Date_Full_Layout)
+	if publishTime == "lately-7" { //最近7天
+		startTime = time.Date(now.Year(), now.Month(), now.Day()-7, 0, 0, 0, 0, time.Local).Format(date.Date_Full_Layout)
+	} else if publishTime == "lately-30" { //最近30天
+		startTime = time.Date(now.Year(), now.Month(), now.Day()-30, 0, 0, 0, 0, time.Local).Format(date.Date_Full_Layout)
+	} else if publishTime == "thisyear" { //最近一年
+		startTime = time.Date(now.Year()-1, now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, time.Local).Format(date.Date_Full_Layout)
+	} else if publishTime == "threeyear" { //最近三年
+		startTime = time.Date(now.Year()-3, now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, time.Local).Format(date.Date_Full_Layout)
+	} else if publishTime == "fiveyear" { //最近五年
+		startTime = time.Date(now.Year()-5, now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, time.Local).Format(date.Date_Full_Layout)
+	} else if len(strings.Split(publishTime, "_")) > 1 {
+		startTimeStr := strings.Split(publishTime, "_")[0]
+		endTimeStr := strings.Split(publishTime, "_")[1]
+		if endTimeStr != "" {
+			et, _ := strconv.ParseInt(endTimeStr, 0, 64)
+			etTime := time.Unix(et, 0)
+			endTime = time.Date(etTime.Year(), etTime.Month(), etTime.Day()+1, 0, 0, 0, 0, time.Local).Format(date.Date_Full_Layout)
+		}
+		if startTimeStr != "" {
+			st, _ := strconv.ParseInt(startTimeStr, 0, 64)
+			startTime = time.Unix(st, 0).Format(date.Date_Full_Layout)
+		}
+	}
+	return
+}

+ 123 - 0
telemarketingEtl/entity/dwd_f_userbase_visit_info.go

@@ -0,0 +1,123 @@
+package entity
+
+import "C"
+import (
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gtime"
+	"github.com/gogf/gf/v2/util/gconv"
+	"log"
+	"regexp"
+	"sync"
+	"telemarketingEtl/config"
+	"telemarketingEtl/util"
+	"time"
+)
+
+var USERPOOL = 20
+var locks = make([]*sync.Mutex, USERPOOL)
+
+func lock(userid string) *sync.Mutex {
+	n := 0
+	for _, v := range userid {
+		n += int(v)
+	}
+	return locks[n%USERPOOL]
+}
+
+func init() {
+	for i := 0; i < USERPOOL; i++ {
+		locks[i] = &sync.Mutex{}
+	}
+}
+
+// 查看事件
+func VisitInfoAdd(start, end int64) {
+	s_id := util.GetObjectId(start)
+	e_id := util.GetObjectId(end)
+	query := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": s_id,
+			"$lt":  e_id,
+		},
+	}
+	RegWx, _ := regexp.Compile("MicroMessenger")
+
+	log.Println("task run", start, end, gtime.NewFromTimeStamp(start))
+	//regWx, _ := regexp.Compile("MicroMessenger")
+	tables := g.Cfg().MustGet(ctx, "logTables").Strings()
+	log.Println("analy tables:", tables)
+	for _, table := range tables {
+		go func(table string) {
+			session := config.MgoLog.GetMgoConn()
+			iter := session.DB("qfw").C(table).Find(query).Iter()
+			count := 0
+			for thisData := map[string]interface{}{}; iter.Next(&thisData); {
+				sPool <- true
+				sWait.Add(1)
+				go func(thisData map[string]interface{}) {
+					defer func() {
+						<-sPool
+						sWait.Done()
+					}()
+					userid := gconv.String(thisData["userid"])
+					if userid == "" {
+						return
+					}
+					if !mongodb.IsObjectIdHex(userid) {
+						userid = GetUserIdByPositionId(userid)
+					}
+					if userid == "" {
+						return
+					}
+					url_ := gconv.String(thisData["url_"])
+					reg := regexp.MustCompile("/article/(content|bdprivate|mailprivate)") //todo 正则
+					contentnum := 0
+					if reg.MatchString(url_) {
+						contentnum = 1
+					}
+					createtime := gconv.Int64(thisData["date"])
+					starttime, endtime := getToday(createtime)
+					craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
+					platform := APP
+					if table == "jy_logs" {
+						platform = PC
+						if client := gconv.String(thisData["client"]); client != "" {
+							if RegWx.MatchString(client) {
+								platform = WX
+							}
+						}
+					}
+					lock(userid).Lock()
+					defer lock(userid).Unlock()
+					if config.JianyuSubjectdb.CountBySql(`select count(1) from dwd_f_userbase_visit_info where userid = ? and createtime>= ? and createtime <?`, userid, starttime, endtime) > 0 {
+						if contentnum == 0 {
+							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,date =? where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, userid, starttime, endtime)
+						} else {
+							config.JianyuSubjectdb.UpdateOrDeleteBySql(`update dwd_f_userbase_visit_info set number = number+1,contentnum = contentnum + 1 where userid = ? and createtime>= ? and createtime <?`, craetetimeStr, userid, starttime, endtime)
+						}
+					} else {
+						config.JianyuSubjectdb.InsertBySql(`INSERT INTO dwd_f_userbase_visit_info
+														(userid,DATE, number, platform,createtime,contentnum)
+														VALUES (?,?,?,?,?,?)`, userid, craetetimeStr, 1, platform, craetetimeStr, contentnum)
+					}
+					//
+				}(thisData)
+				count++
+				if count%100 == 0 {
+					log.Printf("%s已完成%d条数据\n", table, count)
+				}
+				thisData = map[string]interface{}{}
+			}
+			sWait.Wait()
+		}(table)
+	}
+}
+
+func getToday(createtime int64) (start, end string) {
+	now := time.Unix(createtime, 0)
+	startOfDay := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+	endOfDay := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, now.Location())
+	return startOfDay.Format(date.Date_Full_Layout), endOfDay.Format(date.Date_Full_Layout)
+}

+ 5 - 0
telemarketingEtl/entity/entity.go

@@ -7,4 +7,9 @@ const (
 	WX
 	//
 	DWD_F_USERBASE_EVENT_INFO = "dwd_f_userbase_event_info"
+	DWD_F_USERBASE_VISIT_INFO = "dwd_f_userbase_visit_info"
+
+	BUYPORTRAIT = "采购单位画像"
+	INFO        = "标讯"
+	ENTPORTRAIT = "企业画像"
 )

+ 2 - 0
telemarketingEtl/go.mod

@@ -7,11 +7,13 @@ require (
 	github.com/gogf/gf/contrib/drivers/mysql/v2 v2.3.3
 	github.com/gogf/gf/v2 v2.3.3
 	go.mongodb.org/mongo-driver v1.9.1
+	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
 )
 
 require (
 	github.com/BurntSushi/toml v1.1.0 // indirect
 	github.com/clbanning/mxj/v2 v2.5.5 // indirect
+	github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f // indirect
 	github.com/fatih/color v1.13.0 // indirect
 	github.com/fsnotify/fsnotify v1.5.4 // indirect
 	github.com/go-logr/logr v1.2.3 // indirect

+ 4 - 0
telemarketingEtl/go.sum

@@ -95,6 +95,7 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f h1:q/DpyjJjZs94bziQ7YkBmIlpqbVP7yw179rnzoNVX1M=
 github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f/go.mod h1:QGrK8vMWWHQYQ3QU9bw9Y9OPNfxccGzfb41qjvVeXtY=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
@@ -822,6 +823,8 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw=
 gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0=
 gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
@@ -830,6 +833,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 6 - 3
telemarketingEtl/main.go

@@ -1,11 +1,14 @@
 package main
 
 import (
-	"log"
 	_ "telemarketingEtl/config"
-	_ "telemarketingEtl/entity"
+	"telemarketingEtl/timetask"
+	"time"
 )
 
 func main() {
-	log.Println("hello !")
+
+	timetask.TimeTask()
+
+	time.Sleep(999999 * time.Hour)
 }

+ 51 - 0
telemarketingEtl/timetask/task.go

@@ -0,0 +1,51 @@
+package timetask
+
+import (
+	"context"
+	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gcron"
+	"github.com/gogf/gf/v2/os/gctx"
+	"telemarketingEtl/entity"
+	"time"
+)
+
+func TimeTask() {
+	//TaskEventInfo()
+	//TaskSearchInfo()
+	TaskVisitInfo()
+	return
+	var (
+		err error
+		ctx = gctx.New()
+	)
+	_, err = gcron.Add(ctx, "1 0 0 * * *", func(ctx context.Context) {
+		g.Log().Print(ctx, "Every day")
+		//Task_clearUserLog()
+	}, "clearUserTask")
+	if err != nil {
+		panic(err)
+	}
+
+}
+
+func TaskEventInfo() {
+	now := time.Now()
+	end := now.Unix()
+	//start := now.Add(-time.Hour * 24).Unix()
+	entity.EventInfoAdd(1677657397, end)
+}
+
+func TaskSearchInfo() {
+	now := time.Now()
+	end := now.Unix()
+	//start := now.Add(-time.Hour * 24).Unix()
+	entity.SearchInfoAdd(1677657397, end)
+}
+
+func TaskVisitInfo() {
+	now := time.Now()
+	end := now.Unix()
+	//start := now.Add(-time.Hour * 24).Unix()
+	entity.VisitInfoAdd(1677657397, end)
+}

+ 0 - 24
telemarketingEtl/util/task.go

@@ -1,24 +0,0 @@
-package util
-
-import (
-	"context"
-	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gcron"
-	"github.com/gogf/gf/v2/os/gctx"
-)
-
-func TimeTask() {
-	var (
-		err error
-		ctx = gctx.New()
-	)
-	_, err = gcron.Add(ctx, "1 0 0 * * *", func(ctx context.Context) {
-		g.Log().Print(ctx, "Every day")
-		//Task_clearUserLog()
-	}, "clearUserTask")
-	if err != nil {
-		panic(err)
-	}
-
-}

+ 38 - 0
telemarketingEtl/util/util.go

@@ -1,11 +1,49 @@
 package util
 
 import (
+	"app.yhyue.com/moapp/jybase/encrypt"
+	"app.yhyue.com/moapp/jybase/mongodb"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"net/url"
+	"strings"
+	"time"
 )
 
 func MongoId(t int64) primitive.ObjectID {
 	id, _ := primitive.ObjectIDFromHex(fmt.Sprintf("%x0000000000000000", t))
 	return id
 }
+
+// 递归解密
+func Unescape(s string) string {
+	t, err := url.QueryUnescape(s)
+	if err != nil {
+		return s
+	}
+	if t == s {
+		return s
+	}
+	return Unescape(t)
+}
+
+func DecodeId(eid string) string {
+	if eid == "" {
+		return ""
+	}
+	return encrypt.DecodeArticleId2ByCheck(eid)[0]
+}
+
+func SpitObjectId(s string) string {
+	sArr := strings.Split(s, `("`)
+	s = strings.Split(sArr[1], `")`)[0]
+	return s
+}
+
+// return ObjectID("6424fefc0000000000000000")
+func GetObjectId(timestamp int64) primitive.ObjectID {
+	t := time.Unix(timestamp, 0)
+	tid := bson.NewObjectIdWithTime(t)
+	return mongodb.StringTOBsonId(SpitObjectId(tid.String()))
+}