Jelajahi Sumber

更新用户订阅;添加存量更新

wcc 1 tahun lalu
induk
melakukan
665d0292a3
4 mengubah file dengan 90 tambahan dan 14 penghapusan
  1. 16 6
      user_subscribe/config.toml
  2. 1 1
      user_subscribe/go.mod
  3. 4 2
      user_subscribe/go.sum
  4. 69 5
      user_subscribe/main.go

+ 16 - 6
user_subscribe/config.toml

@@ -1,13 +1,23 @@
 [mongo]
 
-    host = "172.17.4.187:27090"
-    #host = "192.168.3.207:29099"
-    db = "sales_leads"
+#    host = "172.17.4.187:27090"
+#    #host = "192.168.3.207:29099"
+#    db = "sales_leads"
+#    coll = "user"               ## 这里用的测试表,正式环境应该用user
+#    savecoll = "user_subscribe" ## 保存结果表
+#    username = "jianyu"
+#    password = "jylog2020_123"
+#    lastID = "65652ead6fa316c8dd469993"     ## 最后获取的用户ID,默认为空
+
+
+#    host = "172.17.4.187:27090"
+    host = "192.168.3.206:27080" ## 测试环境,用户数据
+    db = "qfw"
     coll = "user"               ## 这里用的测试表,正式环境应该用user
     savecoll = "user_subscribe" ## 保存结果表
-    username = "jianyu"
-    password = "jylog2020_123"
-    lastID = "65652ead6fa316c8dd469993"     ## 最后获取的用户ID,默认为空
+    username = ""
+    password = ""
+    lastID = ""     ## 最后获取的用户ID,默认为空
 
 
 [cron] ## 定时任务

+ 1 - 1
user_subscribe/go.mod

@@ -6,7 +6,7 @@ require (
 	github.com/robfig/cron/v3 v3.0.1
 	github.com/spf13/viper v1.15.0
 	go.mongodb.org/mongo-driver v1.11.1
-	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240108061147-857f0a039c16
 )
 
 require (

+ 4 - 2
user_subscribe/go.sum

@@ -1,5 +1,3 @@
-jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20221205033056-885644941005 h1:AEEi+8ao9pTVqPIh6uVvjxBby/i43fFj7DwVo+feDAE=
-jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20221205033056-885644941005/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@@ -917,8 +915,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20221205033056-885644941005 h1:AEEi+8ao9pTVqPIh6uVvjxBby/i43fFj7DwVo+feDAE=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20221205033056-885644941005/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
 jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3 h1:kgtSaRR/hRunxM6Kxi66REk7f2PqN1u56j/V+8FfPW8=
 jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240108061147-857f0a039c16 h1:K6hILkGcqNRmTO1nsrpZMqG2iupJETmnmZmKt77SsNY=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240108061147-857f0a039c16/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
 k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=

+ 69 - 5
user_subscribe/main.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 	"encoding/json"
 	"fmt"
 	"github.com/robfig/cron/v3"
@@ -9,6 +8,7 @@ import (
 	wlog "github.com/wcc4869/common_utils/log"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 	"net"
 	"strings"
 	"time"
@@ -48,7 +48,7 @@ func init() {
 
 }
 
-//processUdpMsg 处理udp
+// processUdpMsg 处理udp
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
 	case udp.OP_TYPE_DATA:
@@ -98,7 +98,7 @@ func InitConfig() {
 
 }
 
-//InitLog 初始化日志
+// InitLog 初始化日志
 func InitLog() {
 	logConfig := &wlog.Options{}
 	err := viper.UnmarshalKey("log", logConfig)
@@ -110,7 +110,6 @@ func InitLog() {
 }
 
 func main() {
-
 	local, _ := time.LoadLocation("Asia/Shanghai")
 	c := cron.New(cron.WithLocation(local), cron.WithSeconds())
 	c.AddFunc(GF.Spec, cpUser)
@@ -122,7 +121,7 @@ func main() {
 
 }
 
-//cpUser 处理用户订阅词,然后拷贝到新表,提供分类使用
+// cpUser 处理用户订阅词,然后拷贝到新表,提供分类使用
 func cpUser() {
 	wlog.Info("cpUser", wlog.String("start", ""))
 	var nuId string
@@ -209,3 +208,68 @@ func cpUser() {
 	wlog.Info("处理用户订阅词", wlog.Int("over", count))
 
 }
+
+// reAllUser 重新更新用户订阅词,打标签
+func reAllUser() {
+	sess := Mgo.GetMgoConn()
+	defer Mgo.DestoryMongoConn(sess)
+	//
+	query := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(nil).Select(nil).Sort("_id").Iter()
+	count := 0
+
+	gtid := ""
+	lteid := ""
+	//var saveUserPool = make([]map[string]interface{}, 0)
+	var sendMsg bool
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		idStr := mongodb.BsonIdToSId(tmp["_id"])
+		if count == 0 {
+			gtid = idStr
+		} else {
+			lteid = idStr
+		}
+		if count%1000 == 0 {
+			wlog.Info("reAllUser", wlog.Int("current", count))
+			wlog.Info("reAllUser", wlog.Any("_id", tmp["_id"]))
+		}
+		keys := GetUserKeys(tmp)
+		update := map[string]interface{}{}
+		if len(keys) > 0 {
+			tags := []string{}
+			for _, v := range keys {
+				tag := toString(v)
+				tags = append(tags, tag)
+			}
+
+			//用户关键词
+			update["key_list"] = strings.Join(tags, ",")
+			update["i_appid"] = tmp["i_appid"]
+			update["_id"] = tmp["_id"]
+
+			err := Mgo.InsertOrUpdate(GF.Mongo.DB, GF.Mongo.SaveColl, update)
+			if err != nil {
+				wlog.Info("reAllUser", wlog.String("id 更新错误", idStr))
+				wlog.Error(idStr, err)
+			}
+
+			sendMsg = true
+		}
+
+	}
+
+	wlog.Info("reAllUser", wlog.String("gtid", gtid), wlog.String("lteid", lteid))
+
+	//调用udp 处理用户行业分类
+	udpData := map[string]interface{}{
+		"gtid":  gtid,
+		"lteid": lteid,
+		"stype": "yonghuhangye",
+	}
+
+	if sendMsg {
+		wlog.Info("reAllUser", wlog.Any("udpData", udpData))
+		SendUdpMsg(udpData, classficationAddr)
+	}
+
+	wlog.Info("处理用户订阅词", wlog.Int("over", count))
+}