Преглед изворни кода

Merge branch 'feature/v1.2.16' into dev/v1.2.16_rjj

renjiaojiao пре 1 година
родитељ
комит
48de001a0c
5 измењених фајлова са 217 додато и 1 уклоњено
  1. 1 0
      entity/message.go
  2. 14 0
      rpc/etc/message.yaml
  3. 183 0
      rpc/internal/common/task.go
  4. 6 1
      rpc/internal/config/config.go
  5. 13 0
      rpc/message.go

+ 1 - 0
entity/message.go

@@ -16,6 +16,7 @@ var Mysql *mysql.Mysql
 var BaseMysql *mysql.Mysql
 var MessageColumn []map[string]interface{}
 var MQFW m.MongodbSim
+var Bidding m.MongodbSim
 var GmailAuth []*mail.GmailAuth
 var SurvivalTime int
 var RollingTiming int64

+ 14 - 0
rpc/etc/message.yaml

@@ -17,6 +17,15 @@ Mongodb:
   Password:
   Collection:
   Collection_back:
+Bidding:
+  Address: 192.168.3.206:27002
+  Size: 10
+  DbName: mixdata
+  ReplSet:
+  UserName: jyDevGroup
+  Password: jy@DevGroup
+  Collection: bidding
+  Collection_back: bidding_back
 DataSource:
     DbName: jianyu
     Address: 192.168.3.217:4000
@@ -75,5 +84,10 @@ Clickhouse:
   MaxIdleConns: 5
   MaxOpenConns: 30
 GlobMsgLoadTime: "0 0/5 * * * *"
+
+FreeIntelTime: "0 0 10 ? * MON"
+PayIntelTime: "0 0 10 * * ?"
+FreePushNumber: 1
+PayPushNumber: 5
 EquityInfoMsgType: 13 #营销权益消息需要特殊处理
 NewUserMsgTitle: 做任务赚好礼

+ 183 - 0
rpc/internal/common/task.go

@@ -4,8 +4,13 @@ import (
 	"app.yhyue.com/moapp/MessageCenter/entity"
 	"app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
 	"app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/encrypt"
+	"context"
 	"fmt"
 	"github.com/robfig/cron/v3"
+	"log"
+	"strings"
+	"time"
 )
 
 var GlobMsgMap map[int]map[string]interface{}
@@ -15,6 +20,9 @@ func LoadTask() {
 	LoadMsgOnTime()
 	c := cron.New(cron.WithSeconds())
 	c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime)
+	c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送
+	c.AddFunc(config.ConfigJson.PayIntelTime, PayIntelUserPush)   //付费用户推送
+
 	go c.Start()
 	defer c.Stop()
 }
@@ -31,3 +39,178 @@ func LoadMsgOnTime() {
 	}
 	fmt.Println("GlobMsgMap len", len(GlobMsgMap))
 }
+
+func FreeIntelUserPush() {
+	data := messageData(config.ConfigJson.FreePushNumber)
+	if data == nil || len(*data) == 0 {
+		return
+	}
+	users := IntelUser(true)
+	log.Println(len(users), data)
+	PushData(users, data)
+}
+
+func PayIntelUserPush() {
+	data := messageData(config.ConfigJson.PayPushNumber)
+	if data == nil || len(*data) == 0 {
+		return
+	}
+	users := IntelUser(false)
+	log.Println(len(users), data)
+	PushData(users, data)
+}
+
+func PushData(users []string, data *[]map[string]interface{}) {
+	log.Printf("需推送用户:%d\n", len(users))
+	var ids []int64
+	if data != nil {
+		for _, m := range *data {
+			_id := common.InterfaceToStr(m["_id"])
+			var link []string
+			link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
+			mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
+			link = append(link, mobLink)
+			link = append(link, mobLink)
+			link = append(link, mobLink)
+
+			iData := map[string]interface{}{
+				"msg_type":    9,
+				"title":       "您有一条专属商机情报",
+				"content":     fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
+				"send_mode":   1,
+				"send_time":   time.Now().Format("2006-01-02 15:04:05"),
+				"send_status": 4,
+				"update_time": time.Now().Format("2006-01-02 15:04:05"),
+				"createtime":  time.Now().Format("2006-01-02 15:04:05"),
+				"link":        strings.Join(link, ","),
+				"isdel":       1,
+				"send_userid": "商机情报定时推送",
+				"sign":        0,
+				"group_id":    5,
+			}
+			id := entity.Mysql.Insert("message_send_log", iData)
+			ids = append(ids, id)
+		}
+	}
+
+	if len(ids) > 0 {
+		var userIds []string
+		for k, user := range users {
+			userIds = append(userIds, user)
+			if len(userIds) == 1000 || k == len(users)-1 {
+				UpdateBatch(userIds, ids)
+				userIds = []string{}
+			}
+		}
+
+	}
+}
+
+// 用户
+func IntelUser(isFree bool) []string {
+	mUser := make(map[string]bool)
+	data := entity.Mysql.SelectBySql(`SELECT a.phone as phone FROM entniche_user a INNER JOIN entniche_info b on  b.status = 1 and a.power = 1 and a.ent_id = b.id and a.phone != ''`)
+	if data != nil { //统计商机管理用户
+		for _, m := range *data {
+			mUser[common.InterfaceToStr(m["phone"])] = true
+		}
+	}
+	var uData []string
+	switch isFree {
+	case false:
+		user, ok := entity.MQFW.Find("user", map[string]interface{}{
+			"i_appid": 2,
+			"$or": []map[string]interface{}{
+				{
+					"i_vip_status": map[string]interface{}{"$gt": 0},
+				},
+				{
+					"i_member_status": map[string]interface{}{"$gt": 0},
+				},
+			},
+		}, "", `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1)
+		if ok && user != nil && len(*user) > 0 {
+			for _, m := range *user {
+				phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
+				uData = append(uData, common.InterfaceToStr(m["_id"]))
+				if mUser[phone] { //获取剩余商机管理用户
+					delete(mUser, phone)
+				}
+			}
+			if len(mUser) > 0 { //统计剩余商机管理用户
+				var (
+					phones []string
+					count  int
+				)
+
+				for phone := range mUser {
+					count++
+					phones = append(phones, phone)
+					if len(phones) == 100 || count == len(mUser) {
+						user1, ok1 := entity.MQFW.Find("user", map[string]interface{}{
+							"i_appid": 2,
+							"$or": []map[string]interface{}{
+								{
+									"s_phone": map[string]interface{}{
+										"$in": phones,
+									},
+								},
+								{
+									"s_m_phone": map[string]interface{}{
+										"$in": phones,
+									},
+								},
+							},
+						}, "", `{"_id":1}`, false, -1, -1)
+						if ok1 && user1 != nil && len(*user1) > 0 {
+							for _, m := range *user1 {
+								uData = append(uData, common.InterfaceToStr(m["_id"]))
+							}
+						}
+						phones = []string{}
+					}
+				}
+			}
+		}
+
+	case true:
+		sess := entity.MQFW.GetMgoConn()
+		defer entity.MQFW.DestoryMongoConn(sess)
+		iter := sess.DB("qfw").C("user").Find(map[string]interface{}{
+			"i_appid": 2,
+		}).Select(map[string]interface{}{"i_vip_status": 1, "i_member_status": 1, "_id": 1, "s_phone": 1, "s_m_phone": 1}).Iter()
+		for m := make(map[string]interface{}); iter.Next(&m); {
+			if common.IntAll(m["i_vip_status"]) <= 0 && common.IntAll(m["i_member_status"]) <= 0 {
+				phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
+				if !mUser[phone] {
+					uData = append(uData, common.InterfaceToStr(m["_id"]))
+				}
+			}
+			m = map[string]interface{}{}
+		}
+	}
+	return uData
+}
+
+// 获取推送消息
+func messageData(number int) *[]map[string]interface{} {
+	query := map[string]interface{}{
+		"publishtime": map[string]interface{}{"$gt": time.Now().AddDate(0, 0, -1).Unix()},
+	}
+	data, _ := entity.Bidding.Find("project_forecast", query, map[string]interface{}{"publishtime": -1}, `{"title":1,"_id":1}`, false, 0, number)
+	return data
+}
+
+func UpdateBatch(ids []string, msgLogId []int64) {
+	str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
+	var bits []string
+	for _, i2 := range msgLogId {
+		bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
+	}
+	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
+	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
+	if err1 != nil {
+		log.Printf("批量更新message_user_summary出错:%s", err1)
+		return
+	}
+}

+ 6 - 1
rpc/internal/config/config.go

@@ -12,6 +12,7 @@ type Config struct {
 	//Mysql           string
 	RedisAddr       string `json:"RedisAddr"`
 	Mongodb         *mgoConf
+	Bidding         *mgoConf
 	SurvivalTime    int
 	SaveConcurrency int       // 消息保存并发数
 	WxWebdomain     string    `json:"WxWebdomain"`
@@ -23,11 +24,15 @@ type Config struct {
 		Pwd  string `json:"pwd"`
 		User string `json:"user"`
 	} `json:"mail"`
-	TidbEng           string `json:"Tidb"`
 	Registedate       int64
+	TidbEng           string  `json:"Tidb"`
 	ClassSearchList   []int64 `json:"ClassSearchList"` // 需要按照messageclass 查询的groupId
 	Clickhouse        *CHouseConfig
 	GlobMsgLoadTime   string `json:"GlobMsgLoadTime"`
+	FreeIntelTime     string `json:"FreeIntelTime"`
+	PayIntelTime      string `json:"PayIntelTime"`
+	FreePushNumber    int    `json:"FreePushNumber"`
+	PayPushNumber     int    `json:"PayPushNumber"`
 	EquityInfoMsgType int64  `json:"EquityInfoMsgType"` // 营销权益消息需要特殊处理的消息类型
 	NewUserMsgTitle   string `json:"NewUserMsgTitle"`
 }

+ 13 - 0
rpc/message.go

@@ -105,6 +105,19 @@ func init() {
 		}
 		entity.MQFW.InitPool()
 	}
+	// 初始化mongo
+	if config.ConfigJson.Bidding != nil {
+		log.Println("初始化 mongodb Bidding")
+		entity.Bidding = m.MongodbSim{
+			MongodbAddr: config.ConfigJson.Bidding.Address,
+			Size:        config.ConfigJson.Bidding.Size,
+			DbName:      config.ConfigJson.Bidding.DbName,
+			UserName:    config.ConfigJson.Bidding.UserName,
+			Password:    config.ConfigJson.Bidding.Password,
+			ReplSet:     config.ConfigJson.Bidding.ReplSet,
+		}
+		entity.Bidding.InitPool()
+	}
 	// 初始化发送邮件
 	for _, v := range config.ConfigJson.Mail {
 		entity.GmailAuth = append(entity.GmailAuth, &mail.GmailAuth{