xuzhiheng 2 years ago
parent
commit
2b8bb726e0
4 changed files with 205 additions and 19 deletions
  1. 39 12
      clueSync/config.go
  2. 10 1
      clueSync/config.json
  3. 17 6
      clueSync/main.go
  4. 139 0
      clueSync/subscribe.go

+ 39 - 12
clueSync/config.go

@@ -1,19 +1,27 @@
 package main
 
+import (
+	"log"
+
+	"app.yhyue.com/moapp/jybase/common"
+)
+
 type (
 	Config struct {
-		CornExp1      string `json:"cornexp1"`
-		CornExp2      string `json:"cornexp2"`
-		CornExp3      string `json:"cornexp3"`
-		CornExp4      string `json:"cornexp4"`
-		CornExp5      string `json:"cornexp5"`
-		LastOrderId   int    `json:"lastOrderId"`
-		LastUserId    string `json:"lastUserId"`
-		LastId        string `json:"lastId"`
-		LastOrderTime string `json:"lastOrderTime"`
-		LastUserTime  string `json:"lastUserTime"`
-		CountLimit    int64  `json:"countLimit"`
-		TiDb          struct {
+		CornExp1        string `json:"cornexp1"`
+		CornExp2        string `json:"cornexp2"`
+		CornExp3        string `json:"cornexp3"`
+		CornExp4        string `json:"cornexp4"`
+		CornExp5        string `json:"cornexp5"`
+		CornExp6        string `json:"cornexp6"`
+		LastOrderId     int    `json:"lastOrderId"`
+		LastUserId      string `json:"lastUserId"`
+		LastId          string `json:"lastId"`
+		LastOrderTime   string `json:"lastOrderTime"`
+		LastUserTime    string `json:"lastUserTime"`
+		LastSubscribeId string `json:"lastSubscribeId"`
+		CountLimit      int64  `json:"countLimit"`
+		TiDb            struct {
 			Host        string `json:"host"`
 			Port        int    `json:"port"`
 			Database    string `json:"database"`
@@ -48,9 +56,28 @@ type (
 			DbName  string `json:"dbName"`
 			DbSize  int    `json:"dbSize"`
 		} `json:"mgo"`
+		MgoLog struct {
+			Address  string `json:"address"`
+			DbName   string `json:"dbName"`
+			DbSize   int    `json:"dbSize"`
+			User     string `json:"user"`
+			Password string `json:"password"`
+		} `json:"mgoLog"`
 		Es struct {
 			Address string `json:"address"`
 			DbSize  int    `json:"dbSize"`
 		} `json:"es"`
 	}
 )
+
+var AreaCode = map[string]string{}
+
+func InitArea() {
+	info := TiDb.Find("d_area_code", nil, "", "", -1, -1)
+	if info != nil && len(*info) > 0 {
+		for _, m := range *info {
+			AreaCode[common.ObjToString(m["name"])] = common.ObjToString(m["code"])
+		}
+	}
+	log.Println("AreaCodeLen ", len(AreaCode))
+}

+ 10 - 1
clueSync/config.json

@@ -4,14 +4,16 @@
 	"cornexp3": "0 */5 * * * ?",
 	"cornexp4": "0 0 0 /1 * ? ",
 	"cornexp5": "0 */5 * * * ?",
+	"cornexp6": "0 */10 * * * ?",
 	"lastOrderId": 172463,
 	"lastUserId": "6440f0e37242fb7cbc685b7b",
 	"lastId": "6440faf7bd4ea1862af84c56",
 	"lastOrderTime": "2023-04-23 00:00:00",
 	"lastUserTime": "2023-04-23 00:00:00",
+	"lastSubscribeId": "64473e36c572141d78ec7a03",
 	"countLimit": 1000,
 	"tiDb": {
-		"host": "127.0.0.1",
+		"host": "192.168.3.149",
 		"port": 4000,
 		"database": "jianyu_subjectdb_test",
 		"user": "root",
@@ -45,6 +47,13 @@
 		"dbName": "qfw",
 		"dbSize": 20
 	},
+	"mgoLog": {
+		"address": "192.168.3.206:27090",
+		"dbName": "qfw",
+		"dbSize": 20,
+		"user": "admin",
+		"password": "123456"
+	},
 	"es": {
 		"address": "http://192.168.3.206:9800",
 		"dbSize": 20

+ 17 - 6
clueSync/main.go

@@ -13,12 +13,13 @@ import (
 )
 
 var (
-	cfg   = new(Config)
-	Mysql *mysql.Mysql
-	TiDb  *mysql.Mysql
-	Base  *mysql.Mysql
-	Es    *elastic.Elastic
-	Mgo   *mongodb.MongodbSim
+	cfg    = new(Config)
+	Mysql  *mysql.Mysql
+	TiDb   *mysql.Mysql
+	Base   *mysql.Mysql
+	Es     *elastic.Elastic
+	Mgo    *mongodb.MongodbSim
+	MgoLog *mongodb.MongodbSim
 	// configFile = flag.String("c", "./config.yaml", "配置文件")
 	mode = flag.Int("m", 1, "")
 )
@@ -61,6 +62,8 @@ func main() {
 	}
 	Es.InitElasticSize()
 	Mgo = mongodb.NewMgo(cfg.Mgo.Address, cfg.Mgo.DbName, cfg.Mgo.DbSize)
+	MgoLog = mongodb.NewMgoWithUser(cfg.MgoLog.Address, cfg.MgoLog.DbName, cfg.MgoLog.User, cfg.MgoLog.Password, cfg.MgoLog.DbSize)
+	InitArea()
 	if *mode == 1 {
 		// 未支付订单 30分钟一次
 		orders()
@@ -115,5 +118,13 @@ func main() {
 		})
 		e.Start()
 		select {}
+	} else if *mode == 7 {
+		subscribeAddSync()
+		e := cron.New()
+		e.AddFunc(cfg.CornExp6, func() {
+			subscribeAddSync()
+		})
+		e.Start()
+		select {}
 	}
 }

+ 139 - 0
clueSync/subscribe.go

@@ -0,0 +1,139 @@
+package main
+
+import (
+	"strings"
+	"time"
+
+	"log"
+
+	"app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/mongodb"
+)
+
+func subscribeAddSync() {
+	log.Println("订阅数据定时任务开始")
+	session := MgoLog.GetMgoConn()
+	lastId := cfg.LastSubscribeId
+	defer func() {
+		MgoLog.DestoryMongoConn(session)
+	}()
+	query := map[string]interface{}{}
+	if lastId != "" {
+		query["_id"] = map[string]interface{}{"$gt": mongodb.StringTOBsonId(lastId)}
+	}
+	// query["_id"] = mongodb.StringTOBsonId("64473e36c572141d78ec7a03")
+	log.Println("query :", query)
+
+	iter := session.DB(cfg.MgoLog.DbName).C("ovipjy_log").Find(&query).Sort("_id").Iter()
+	thisData := map[string]interface{}{}
+
+	for {
+		if !iter.Next(&thisData) {
+			break
+		}
+		cfg.LastSubscribeId = mongodb.BsonIdToSId(thisData["_id"])
+		FormatSubscribeData(thisData)
+	}
+	common.WriteSysConfig(&cfg)
+	log.Println("订阅数据定时任务结束")
+}
+
+func FormatSubscribeData(data map[string]interface{}) {
+	types, stype := common.ObjToString(data["type"]), ""
+	// createtime := common.Int64All(data["createtime"])
+	userId := common.ObjToString(data["userid"])
+	if !mongodb.IsObjectIdHex(userId) {
+		userMapping := TiDb.FindOne("dwd_f_userbase_id_mapping", map[string]interface{}{"position_id": userId}, "", "")
+		if userMapping != nil && len(*userMapping) > 0 {
+			userId = common.ObjToString((*userMapping)["userid"])
+		}
+	}
+	areaCodes := ""
+	keywords := ""
+	keyArrs := []string{}
+	if types == "o_member_jy" {
+		stype = "大会员订阅"
+	} else if types == "o_vipjy" {
+		stype = "超级订阅"
+	} else if types == "o_jy" {
+		stype = "免费订阅"
+	}
+	if types != "" {
+		if jy, ok := data[types].(map[string]interface{}); ok {
+			if area, oks := jy["o_area"].(map[string]interface{}); oks {
+				areaArr := []string{}
+				for k, _ := range area {
+					areaArr = append(areaArr, AreaCode[k])
+				}
+				if len(area) == 0 {
+					areaArr = append(areaArr, AreaCode["全国"])
+				}
+				areaCodes = strings.Join(areaArr, ",")
+			}
+			if types == "o_jy" {
+				akey, aok := jy["a_key"].([]map[string]interface{})
+				if !aok {
+					akeys, _ := jy["a_key"].([]interface{})
+					akey = common.ObjArrToMapArr(akeys)
+				}
+				for _, v := range akey {
+					keysArr, asok := v["key"].([]string)
+					if !asok {
+						keysArr_s, _ := v["key"].([]interface{})
+						keysArr = common.ObjArrToStringArr(keysArr_s)
+					}
+					for _, key := range keysArr {
+						keyArrs = append(keyArrs, key)
+					}
+				}
+			} else {
+				items, aok := jy["a_items"].([]map[string]interface{})
+				if !aok {
+					itemss, _ := jy["a_items"].([]interface{})
+					items = common.ObjArrToMapArr(itemss)
+				}
+				for _, v := range items {
+					akey, iok := v["a_key"].([]map[string]interface{})
+					if !iok {
+						akeys, _ := v["a_key"].([]interface{})
+						akey = common.ObjArrToMapArr(akeys)
+					}
+					for _, v := range akey {
+						keysArr, asok := v["key"].([]string)
+						if !asok {
+							keysArr_s, _ := v["key"].([]interface{})
+							keysArr = common.ObjArrToStringArr(keysArr_s)
+						}
+						for _, key := range keysArr {
+							keyArrs = append(keyArrs, key)
+						}
+					}
+				}
+			}
+			if len(keyArrs) > 0 {
+				keywords = strings.Join(keyArrs, ",")
+			}
+		}
+		start := time.Now().Format("2006-01-02") + " 00:00:00"
+		end := time.Now().Format("2006-01-02") + " 23:59:59"
+		nowTime := time.Now().Format("2006-01-02 15:04:05")
+		subscribeData := TiDb.SelectBySql(`select * from dwd_f_userbase_subscribe_info where userid = "` + userId + `" and updatetime >= "` + start + `" and updatetime <= "` + end + `"`)
+		if subscribeData != nil && len(*subscribeData) > 0 {
+			TiDb.Update("dwd_f_userbase_subscribe_info", map[string]interface{}{"id": common.IntAll((*subscribeData)[0]["id"])}, map[string]interface{}{
+				"updatetime":         nowTime,
+				"subscribe_areas":    areaCodes,
+				"subscribe_keywords": keywords,
+				"member_type":        stype,
+				"userid":             userId,
+			})
+		} else {
+			TiDb.Insert("dwd_f_userbase_subscribe_info", map[string]interface{}{
+				"userid":             userId,
+				"updatetime":         nowTime,
+				"subscribe_areas":    areaCodes,
+				"subscribe_keywords": keywords,
+				"member_type":        stype,
+			})
+		}
+	}
+}