package entity import ( util "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/encrypt" . "app.yhyue.com/moapp/jybase/mongodb" "context" "fmt" "github.com/gogf/gf/v2/util/gconv" "github.com/zeromicro/go-zero/core/logx" "go.mongodb.org/mongo-driver/bson/primitive" "regexp" "sort" "strconv" "strings" "time" . "userBehaviorTask/config" ) type Task struct { } // 搜索结果汇总 var SearchInfo = make(map[string][]map[string]interface{}) // 三级页信息汇总 var ContentInfo = make(map[string][]map[string]interface{}) // 职位信息获取 var positionUser = map[string]string{} // 三级页详情获取 var biddingInfo map[string]map[string]interface{} // 数据库中数据初始化 var subInItInfo map[string]map[string]string // 最终整合数据处理 var subInfo = make(map[string]map[string]string) var ( ArticleId = regexp.MustCompile(".*article/content/(.*)\\.html") NologinId = regexp.MustCompile(".*nologin/content/(.*)\\.html") ) func (t *Task) Run() { go util.SimpleCrontab(false, DbConf.StatisticTime, func() { now := time.Now() start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local) t.ShuaKu(start.Unix(), end.Unix()) }) } func (t *Task) ShuaKu(start, end int64) { //职位标识与雇员标识处理 UserHandle() //数据库数据初始化 subRecommend() SearchInfo = make(map[string][]map[string]interface{}) ContentInfo = make(map[string][]map[string]interface{}) subInfo = make(map[string]map[string]string) searchHandle(start, end) biddingInfo = map[string]map[string]interface{}{} ContentHandle(start, end) //批量处理入库 for userId, v := range SearchInfo { subInfo[userId] = map[string]string{ "searchfor": gconv.String(v), } } for userId, v := range ContentInfo { //排序 sort.Slice(v, func(i, j int) bool { return v[i]["datetimeInt"].(int64) > v[j]["datetimeInt"].(int64) }) if len(v) > DbConf.InfoCount { ContentInfo[userId] = v[:DbConf.InfoCount-1] } if _, ok := subInfo[userId]; ok { subInfo[userId]["browse"] = gconv.String(v) } else { subInfo[userId] = map[string]string{ "browse": gconv.String(v), } } } updateTime := time.Now().Format("2006-01-02 15:04:05") for userId, v := range subInfo { searchfor := "" browse := "" if _, ok := subInItInfo[userId]; ok { //汇总数据存在 data := subInItInfo[userId] if v["searchfor"] == "" && data["searchfor"] != "" { searchfor = data["searchfor"] } else { searchfor = v["searchfor"] } if v["browse"] == "" && data["browse"] != "" { browse = data["browse"] } else { browse = v["browse"] } //修改操作 sql := fmt.Sprintf(`alter table sub_recommend_rule UPDATE browse ='%s' ,searchfor='%s',update_time='%s' where userid = '%s'`, browse, searchfor, updateTime, userId) err := ClickhouseConn.Exec(context.Background(), sql) if err != nil { logx.Error(err) } } else { //需要新增汇总数据 searchfor := v["searchfor"] browse := v["browse"] sql := fmt.Sprintf("INSERT INTO sub_recommend_rule (userid, searchfor, browse, update_time) values ('%s','%s','%s','%s')", userId, searchfor, browse, updateTime) err := ClickhouseConn.Exec(context.Background(), sql) if err != nil { logx.Error(err) } } } SearchInfo = make(map[string][]map[string]interface{}) ContentInfo = make(map[string][]map[string]interface{}) subInfo = make(map[string]map[string]string) biddingInfo = map[string]map[string]interface{}{} positionUser = map[string]string{} subInItInfo = map[string]map[string]string{} } func UserHandle() { positionUser = map[string]string{} sqlStr := "SELECT a.id, b.phone, a.ent_id FROM base_position a INNER JOIN base_user b ON a.type = 1 AND a.user_id = b.id" BaseServiceMysql.SelectByBath(10, func(l *[]map[string]interface{}) bool { for _, value := range *l { positionId := gconv.String(value["id"]) phone := gconv.String(value["phone"]) ent_id := gconv.Int64(value["ent_id"]) entUser := JianYuMysql.FindOne("entniche_user", map[string]interface{}{ "ent_id": ent_id, "phone": phone, }, "id", "") if entUser == nil { continue } entUserId := gconv.String((*entUser)["id"]) positionUser[positionId] = entUserId } return true }, sqlStr) } // 搜索条件查询 func searchHandle(start, end int64) { /*startTime := primitive.NewObjectIDFromTimestamp(time.Unix(start, 0)) endTime := primitive.NewObjectIDFromTimestamp(time.Unix(end, 0))*/ startTime := strconv.FormatInt(start, 16) + "0000000000000000" endTime := strconv.FormatInt(end, 16) + "0000000000000000" startTimeId, _ := primitive.ObjectIDFromHex(startTime) endTimeId, _ := primitive.ObjectIDFromHex(endTime) logx.Info("搜索条件start。。。", start, end, startTime, endTime) sess := MgoLog.GetMgoConn() defer MgoLog.DestoryMongoConn(sess) it := sess.DB("qfw").C("jy_search_log").Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": startTimeId, "$lt": endTimeId, }, }).Sort("createtime").Select(map[string]interface{}{}).Iter() var numb int64 for m := make(map[string]interface{}); it.Next(&m); { numb++ if numb%1000 == 0 { logx.Info("搜索条件跑了", numb) } userId := gconv.String(m["s_userid"]) if !IsObjectIdHex(userId) { //职位标识替换为企业用户表示 if positionUser[userId] == "" { continue } userId = positionUser[userId] } if SearchInfo[userId] != nil { if len(SearchInfo[userId]) >= DbConf.SearchCount { continue } } wordsMode := gconv.String(m["wordsMode"]) key := []string{} key1 := gconv.Strings(util.If(gconv.String(m["search_word"]) == "", []string{}, strings.Split(gconv.String(m["search_word"]), " "))) key2 := gconv.Strings(util.If(gconv.String(m["additionalWords"]) == "", []string{}, strings.Split(gconv.String(m["additionalWords"]), ","))) if wordsMode == "包含所有" { key1 = append(key1, key2...) if len(key1) > 0 { key = append(key, strings.Join(key1, "+")) } } else { //任意一个 if len(key1) > 0 { key = append(key, strings.Join(key1, "+")) } if len(key2) > 0 { key = append(key, key2...) } } if userId == "6291ca5e31e4ba4956a74a25" { logx.Info(111) } if len(key) == 0 { continue } searchMap := map[string]interface{}{ "winnerTel": util.If(gconv.String(m["search_winnerTel"]) == "y", 1, 0), //0:不限 1:有中标单位联系方式 "selectType": m["search_selectType"], //搜索范围 "fileExists": gconv.Int64(m["fileExists"]), //0:不限 1:有附件 -1:无附件 "notkeys": strings.Split(gconv.String(m["exclusionWords"]), " "), //排除词 "area": strings.Split(gconv.String(m["search_area"]), ","), "city": strings.Split(gconv.String(m["search_city"]), ","), "keys": key, //关键词 "buyerClass": strings.Split(gconv.String(m["search_buyerClass"]), ","), //采购单位行业 "buyerTel": util.If(gconv.String(m["search_buyerTel"]) == "y", 1, 0), //0:不限 1:有采购单位联系方式 "price": m["search_price"], //金额范围,可能没有开始金额,也可能没有结束金额 "topType": strings.Split(gconv.String(m["search_topType"]), ","), //一级信息类型 "subType": strings.Split(gconv.String(m["search_subType"]), ","), //二级信息类型 "industry": strings.Split(gconv.String(m["search_industry"]), ","), //行业分类 "datetime": time.Unix(gconv.Int64(m["createtime"]), 0).Format("2006-01-02 15:04:05"), //搜索时间 } if _, ok := SearchInfo[userId]; ok { SearchInfo[userId] = append(SearchInfo[userId], searchMap) } else { SearchInfo[userId] = []map[string]interface{}{ searchMap, } } } } // 三级页浏览记录查询 func ContentHandle(start, end int64) { var AppContentInfo = make(map[string][]map[string]interface{}) var numb int64 startTime := strconv.FormatInt(start, 16) + "0000000000000000" endTime := strconv.FormatInt(end, 16) + "0000000000000000" startTimeId, _ := primitive.ObjectIDFromHex(startTime) endTimeId, _ := primitive.ObjectIDFromHex(endTime) logx.Info("三级页浏览数据start。。。", start, end, startTime, endTime) sess := MgoLog.GetMgoConn() defer MgoLog.DestoryMongoConn(sess) //jylog it := sess.DB("qfw").C("jy_logs").Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": startTimeId, "$lt": endTimeId, }, }).Sort("_id").Select(map[string]interface{}{ "date": 1, "url": 1, "userid": 1, }).Iter() for m := make(map[string]interface{}); it.Next(&m); { numb++ if numb%1000 == 0 { logx.Info("pc三级页跑了", numb) } InformationHandle(m, ContentInfo) } //jyapp numb = 0 sess1 := MgoLog.GetMgoConn() defer MgoLog.DestoryMongoConn(sess1) it1 := sess1.DB("qfw").C("jyapp_logs").Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": startTimeId, "$lt": endTimeId, }, }).Sort("_id").Select(map[string]interface{}{ "date": 1, "url": 1, "userid": 1, }).Iter() for m := make(map[string]interface{}); it1.Next(&m); { numb++ if numb%1000 == 0 { logx.Info("app三级页记录跑了", numb) } InformationHandle(m, AppContentInfo) } //两个map合成一个处理 for userId, v := range AppContentInfo { if ContentInfo[userId] == nil { ContentInfo[userId] = v } else { ContentInfo[userId] = append(ContentInfo[userId], v...) } } } // 资讯数据处理 func InformationHandle(m map[string]interface{}, data map[string][]map[string]interface{}) { userId := gconv.String(m["userid"]) if !IsObjectIdHex(userId) { //职位标识替换为企业用户表示 if positionUser[userId] == "" { return } userId = positionUser[userId] } if data[userId] != nil { if len(data[userId]) >= DbConf.InfoCount { return } } fu1 := ArticleId.FindStringSubmatch(gconv.String(m["url"])) fu2 := NologinId.FindStringSubmatch(gconv.String(m["url"])) infoId := "" if len(fu1) > 1 { if len(fu1[1]) > 10 { infoId = encrypt.DecodeArticleId2ByCheck(fu1[1])[0] } } else if len(fu2) > 0 { if len(fu2[1]) > 10 { infoId = encrypt.DecodeArticleId2ByCheck(fu2[1])[0] } } if infoId == "" { return } //分类标签,bidding表gov_classify.root,过滤掉只有一级的,取最后一级 rootStr := "" area := "" city := "" district := "" projectname := "" if _, ok := biddingInfo[infoId]; !ok { bidding, _ := MgoBidding.FindById("bidding", infoId, `{"gov_classify":1,"area":1,"projectname":1,"city":1,"district":1}`) biddingInfo[infoId] = map[string]interface{}{} if bidding == nil { return } gov_classify := gconv.Map((*bidding)["gov_classify"]) area = gconv.String((*bidding)["area"]) projectname = gconv.String((*bidding)["projectname"]) district = gconv.String((*bidding)["district"]) city = gconv.String((*bidding)["city"]) if gov_classify != nil { root := strings.Split(gconv.String(gov_classify["root"]), "/") if len(root) == 0 { return } else { for i := len(root) - 1; i >= 1; i-- { if root[i] != "" { rootStr = root[i] biddingInfo[infoId] = map[string]interface{}{ "area": area, "city": city, "projectname": projectname, "district": district, "rootStr": rootStr, } break } } } } } else { rootStr = gconv.String(biddingInfo[infoId]["rootStr"]) area = gconv.String(biddingInfo[infoId]["area"]) city = gconv.String(biddingInfo[infoId]["city"]) district = gconv.String(biddingInfo[infoId]["district"]) projectname = gconv.String(biddingInfo[infoId]["projectname"]) } if rootStr == "" { return } searchMap := map[string]interface{}{ "infoid": infoId, //信息id "classify": rootStr, //分类标签,bidding表gov_classify.root,过滤掉只有一级的,取最后一级 "datetimeInt": gconv.Int64(m["date"]), //访问时间 "datetime": time.Unix(gconv.Int64(m["date"]), 0).Format("2006-01-02 15:04:05"), //搜索时间 "area": area, "city": city, "projectname": projectname, "district": district, } if _, ok := data[userId]; ok { data[userId] = append(data[userId], searchMap) } else { data[userId] = []map[string]interface{}{ searchMap, } } } type Recommend struct { UserId string `ch:"userid"` SearchFor string `ch:"searchfor"` Browse string `ch:"browse"` } // subRecommend初始化 func subRecommend() { subInItInfo = map[string]map[string]string{} rData1, err := ClickhouseConn.Query(context.Background(), `SELECT userid, searchfor, browse FROM sub_recommend_rule order by userid`) if err != nil { logx.Error("汇总表查询出错:", err) return } for rData1.Next() { data := Recommend{} rData1.ScanStruct(&data) userid := data.UserId subInItInfo[userid] = map[string]string{ "searchfor": data.SearchFor, "browse": data.Browse, } } }