Jelajahi Sumber

feat:xiugai

wangchuanjin 1 tahun lalu
induk
melakukan
2c1f6287e2

+ 6 - 3
subrecommend/config.json

@@ -1,7 +1,7 @@
 {
 	"testIds":["5d6378301c298a5aac7b5402"],
-	"testQuery":{"i_userid":99999},
-	"testUserIds":[99999],
+	"testQuery":{"i_userid":68100},
+	"testUserIds":[68100],
 	"matchPoolSize":60,
 	"savePoolSize":5,
 	"matchDuration":60,
@@ -13,5 +13,8 @@
 	"similarity": 0.5,
 	"searchLimitDay":15,
 	"browseLimitDay":15,
-	"linkWordSize": 5
+	"linkWordSize": 5,
+	"saveBatch":10000,
+	"checkMutationSec":3,
+	"mutationMax":0
 }

+ 3 - 0
subrecommend/config/config.go

@@ -20,6 +20,9 @@ type config struct {
 	SearchLimitDay   int                    `json:"searchLimitDay"`
 	BrowseLimitDay   int                    `json:"browseLimitDay"`
 	LinkWordSize     int                    `json:"linkWordSize"`
+	SaveBatch        int                    `json:"saveBatch"`
+	CheckMutationSec int                    `json:"checkMutationSec"`
+	MutationMax      int64                  `json:"mutationMax"`
 }
 
 type taskConfig struct {

+ 26 - 3
subrecommend/db/db.go

@@ -1,6 +1,8 @@
 package public
 
 import (
+	"context"
+	"fmt"
 	"log"
 
 	util "app.yhyue.com/moapp/jybase/common"
@@ -8,13 +10,13 @@ import (
 	"app.yhyue.com/moapp/jybase/mysql"
 	"app.yhyue.com/moapp/jybase/redis"
 	. "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
-	_ "github.com/ClickHouse/clickhouse-go/v2"
+	"github.com/ClickHouse/clickhouse-go/v2"
 )
 
 var (
 	DbConf      *dbConf
 	Tidb        *mysql.Mysql
-	Clickhouse  *mysql.Mysql
+	Clickhouse  clickhouse.Conn
 	Mgo         *m.MongodbSim
 	Mgo_Bidding *m.MongodbSim
 )
@@ -82,7 +84,28 @@ func init() {
 		}
 		if DbConf.Mysql.Clickhouse != nil {
 			log.Println("初始化 mysql clickhouse")
-			Clickhouse = mysql.NewInit(mysql.CLICKHOUSE, DbConf.Mysql.Clickhouse.Address, DbConf.Mysql.Clickhouse.MaxOpenConns, DbConf.Mysql.Clickhouse.MaxIdleConns)
+			var err error
+			Clickhouse, err = clickhouse.Open(&clickhouse.Options{
+				Addr: []string{DbConf.Mysql.Clickhouse.Address},
+				Auth: clickhouse.Auth{
+					Database: DbConf.Mysql.Clickhouse.DbName,
+					Username: DbConf.Mysql.Clickhouse.UserName,
+					Password: DbConf.Mysql.Clickhouse.PassWord,
+				},
+				Debugf: func(format string, v ...any) {
+					fmt.Printf(format+"\n", v...)
+				},
+				Compression: &clickhouse.Compression{
+					Method: clickhouse.CompressionLZ4,
+				},
+				MaxOpenConns: DbConf.Mysql.Clickhouse.MaxOpenConns,
+				MaxIdleConns: DbConf.Mysql.Clickhouse.MaxIdleConns,
+			})
+			if err != nil {
+				log.Println(err)
+			} else if err := Clickhouse.Ping(context.Background()); err != nil {
+				log.Println(err)
+			}
 		}
 	}
 }

+ 4 - 0
subrecommend/job/matchjob.go

@@ -47,6 +47,10 @@ func (m *MatchJob) Execute() {
 		behaviorCacheDay = nowDay
 		behaviorCacheData = LoadAllBehavior()
 	}
+	if AllRecommend = LoadAllRecommend(); AllRecommend == nil {
+		logger.Error("加载推荐数据异常")
+		return
+	}
 	(&PersonMatch{Datas: datas, AllBehavior: behaviorCacheData}).Start()
 	(&EntMatch{Datas: datas, AllBehavior: behaviorCacheData}).Start()
 	TaskConfig.Pici = endTime

+ 68 - 10
subrecommend/matcher/matcher.go

@@ -1,13 +1,16 @@
 package matcher
 
 import (
+	"context"
 	"fmt"
+	"log"
 	"sort"
 	"strings"
 	. "subrecommend/config"
 	. "subrecommend/db"
 	. "subrecommend/util"
 	"sync"
+	"time"
 
 	. "app.yhyue.com/moapp/jybase/common"
 	. "app.yhyue.com/moapp/jybase/date"
@@ -180,6 +183,12 @@ func (m *MatchOther) MergeInfos(userMap *map[*UserInfo]*SortList) {
 	}
 }
 
+var (
+	lock         = &sync.Mutex{}
+	saveArray    = [][]string{}
+	AllRecommend = map[string][]uint64{}
+)
+
 //
 func Save(batchIndex int, userMap *map[*UserInfo]*SortList) {
 	logger.Info("第", batchIndex, "批开始保存。。。")
@@ -233,15 +242,16 @@ func Save(batchIndex int, userMap *map[*UserInfo]*SortList) {
 				return
 			}
 			nowFormat := NowFormat(Date_Full_Layout)
-			datas := Clickhouse.SelectBySql(`select bitmapToArray(infoids) as infoids from jianyu.sub_recommend_list where userid=? order by update_time desc limit 1`, ui.Id)
-			if datas == nil {
-				logger.Error(ui.Id, "查询clickhouse失败")
-				return
-			}
-			if len(*datas) == 0 {
-				Clickhouse.InsertBySql(`INSERT INTO jianyu.sub_recommend_list (userid, infoids, area, update_time) values ('` + ui.Id + `',bitmapBuild([` + toUInt64(ids) + `]),'` + area + `','` + nowFormat + `')`)
+			infoids := AllRecommend[ui.Id]
+			if infoids == nil {
+				lock.Lock()
+				saveArray = append(saveArray, []string{fmt.Sprintf("'%s'", ui.Id), `bitmapBuild([` + toUInt64(ids) + `])`, fmt.Sprintf("'%s'", area), fmt.Sprintf("'%s'", nowFormat)})
+				if len(saveArray) == Config.SaveBatch {
+					saveBatch(saveArray)
+					saveArray = [][]string{}
+				}
+				lock.Unlock()
 			} else {
-				infoids, _ := (*datas)[0]["infoids"].([]uint64)
 				intIds := []int{}
 				allInfoIds := map[int]bool{}
 				for _, v := range infoids {
@@ -257,8 +267,9 @@ func Save(batchIndex int, userMap *map[*UserInfo]*SortList) {
 				if len(intIds) > maxSize {
 					intIds = intIds[len(intIds)-maxSize:]
 				}
-				text := `ALTER TABLE jianyu.sub_recommend_list update area='` + area + `',infoids=bitmapBuild([` + toUInt64(intIds) + `]),update_time='` + nowFormat + `' where userid='` + ui.Id + `'`
-				Clickhouse.UpdateOrDeleteBySql(text)
+				if err := Clickhouse.Exec(context.Background(), `ALTER TABLE jianyu.sub_recommend_list update area='`+area+`',infoids=bitmapBuild([`+toUInt64(intIds)+`]),update_time='`+nowFormat+`' where userid='`+ui.Id+`'`); err != nil {
+					logger.Error(err)
+				}
 			}
 		}(k, v)
 		index++
@@ -267,9 +278,56 @@ func Save(batchIndex int, userMap *map[*UserInfo]*SortList) {
 		}
 	}
 	saveWaitGroup.Wait()
+	if len(saveArray) > 0 {
+		saveBatch(saveArray)
+	}
 	logger.Info("第", batchIndex, "批保存。。。", index)
 }
 
+//
+// func selectOne(userId string) ([]uint64, error) {
+// 	row := Clickhouse.QueryRow(context.Background(), `select bitmapToArray(infoids) as infoids from jianyu.sub_recommend_list where userid='`+userId+`' order by update_time desc limit 1`)
+// 	var infoids []uint64
+// 	if err := row.Scan(&infoids); err != nil {
+// 		logger.Error(err)
+// 		return nil, err
+// 	}
+// 	return infoids, nil
+// }
+
+//
+func saveBatch(array [][]string) bool {
+	q := "INSERT INTO jianyu.sub_recommend_list (userid, infoids, area, update_time) values "
+	for k, v := range array {
+		if k != 0 {
+			q += ","
+		}
+		q += fmt.Sprintf("(%s)", strings.Join(v, ","))
+	}
+	log.Println(q)
+	if err := Clickhouse.Exec(context.Background(), q); err != nil {
+		logger.Error(err)
+		return false
+	}
+	return true
+}
+
+//
+func updateBatch() {
+	for {
+		row := Clickhouse.QueryRow(context.Background(), `select count(1) as count from system.mutations where is_done=0`)
+		var count int64
+		if err := row.Scan(&count); err != nil {
+			logger.Error(err)
+		} else if err == nil && count <= Config.MutationMax {
+			break
+		}
+		sleep := time.Duration(Config.CheckMutationSec)
+		logger.Info("监测到mutation", count, "超过限制", Config.MutationMax, "休眠", sleep)
+		time.Sleep(time.Second * sleep)
+	}
+}
+
 //
 func toUInt64(ids []int) string {
 	text := ""

+ 59 - 30
subrecommend/util/util.go

@@ -2,6 +2,7 @@ package util
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
@@ -51,17 +52,27 @@ func LoadAllBehavior() map[string]*Behavior {
 		q += ` where userid in ('` + strings.Join(testIds, "','") + `')`
 	}
 	logger.Info("开始加载用户行为数据。。", q)
-
 	index := 0
 	allBehavior := map[string]*Behavior{}
-	Clickhouse.SelectByBath(1, func(l *[]map[string]interface{}) bool {
+	rows, err := Clickhouse.Query(context.Background(), q)
+	if err != nil {
+		logger.Error(err)
+		return allBehavior
+	}
+	for rows.Next() {
 		index++
 		if index%500 == 0 {
 			logger.Info("加载用户行为数据", index)
 		}
-		searchfor, _ := (*l)[0]["searchfor"].(string)
-		browse, _ := (*l)[0]["browse"].(string)
-		userId, _ := (*l)[0]["userid"].(string)
+		var (
+			userId    string
+			browse    string
+			searchfor string
+		)
+		if err := rows.Scan(&userId, &browse, &searchfor); err != nil {
+			logger.Error(err)
+			continue
+		}
 		searchfors := []*Searchfor{}
 		if searchfor != "" {
 			searchforsTemp := []*Searchfor{}
@@ -101,7 +112,7 @@ func LoadAllBehavior() map[string]*Behavior {
 			}
 		}
 		if len(searchfors) == 0 && len(browses) == 0 {
-			return true
+			continue
 		}
 		area := ""
 		if browses != nil && len(browses) > 0 {
@@ -112,40 +123,59 @@ func LoadAllBehavior() map[string]*Behavior {
 			Browses:    browses,
 			Area:       area,
 		}
-		return true
-	}, q)
+	}
+	rows.Close()
+	if err := rows.Err(); err != nil {
+		logger.Error(err)
+	}
 	logger.Info("加载用户行为数据结束。。", index)
 	return allBehavior
 }
 
 //
-func LoadBehavior(userId string) ([]*Searchfor, []*Browse, string) {
-	rules := Clickhouse.SelectBySql(`select browse,searchfor from jianyu.sub_recommend_rule where userid=? order by update_time desc limit 1`, userId)
-	if rules == nil || len(*rules) == 0 {
-		return nil, nil, ""
-	}
-	searchfor, _ := (*rules)[0]["searchfor"].(string)
-	browse, _ := (*rules)[0]["browse"].(string)
-	if searchfor == "" && browse == "" {
-		return nil, nil, ""
+func LoadAllRecommend() map[string][]uint64 {
+	allRecommend := map[string][]uint64{}
+	q := `select userid,bitmapToArray(infoids) as infoids from jianyu.sub_recommend_list`
+	testIds := []string{}
+	if len(Config.TestIds) > 0 {
+		testIds = append(testIds, Config.TestIds...)
 	}
-	var searchfors []*Searchfor
-	if searchfor != "" {
-		if err := json.Unmarshal([]byte(searchfor), &searchfors); err != nil {
-			logger.Error(userId, err)
+	if len(Config.TestUserIds) > 0 {
+		for _, v := range Config.TestUserIds {
+			testIds = append(testIds, fmt.Sprint(v))
 		}
 	}
-	var browses []*Browse
-	if browse != "" {
-		if err := json.Unmarshal([]byte(browse), &browses); err != nil {
-			logger.Error(userId, err)
+	if len(testIds) > 0 {
+		q += ` where userid in ('` + strings.Join(testIds, "','") + `')`
+	}
+	logger.Info("开始加载用户推荐数据。。", q)
+	index := 0
+	rows, err := Clickhouse.Query(context.Background(), q)
+	if err != nil {
+		logger.Error(err)
+		return nil
+	}
+	for rows.Next() {
+		index++
+		if index%500 == 0 {
+			logger.Info("加载用户推荐数据", index)
 		}
+		var (
+			userId  string
+			infoids []uint64
+		)
+		if err := rows.Scan(&userId, &infoids); err != nil {
+			logger.Error(err)
+			continue
+		}
+		allRecommend[userId] = infoids
 	}
-	area := ""
-	if browses != nil && len(browses) > 0 {
-		area = browses[0].Area
+	rows.Close()
+	if err := rows.Err(); err != nil {
+		logger.Error(err)
 	}
-	return searchfors, browses, area
+	logger.Info("加载用户推荐数据结束。。", index)
+	return allRecommend
 }
 
 //tp 0:企业-商机管理 2:企业-超级订阅/大会员 3:企业-免费 -1:个人-免费 -2:个人-超级订阅 -3:个人-大会员
@@ -265,7 +295,6 @@ func proccessKey(a_key []interface{}) []map[string]interface{} {
 
 //获取相似词
 func GetLikeWord(keys []string) []string {
-	return keys
 	redisKey := fmt.Sprintf("subLikeKey_%s", strings.Join(keys, "+"))
 	b, err := redis.GetNewBytes("pushcache_2_d", redisKey)
 	if err != nil {