123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- package common
- import (
- "app.yhyue.com/moapp/MessageCenter/entity"
- "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
- "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/encrypt"
- "app.yhyue.com/moapp/jybase/redis"
- "context"
- "fmt"
- "github.com/robfig/cron/v3"
- "log"
- "strings"
- "time"
- )
- var GlobMsgMap map[int]map[string]interface{}
- func LoadTask() {
- // 每隔10分钟执行一次
- LoadMsgOnTime()
- c := cron.New(cron.WithSeconds())
- c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime)
- c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送
- c.AddFunc(config.ConfigJson.PayIntelTime, PayIntelUserPush) //付费用户推送
- go c.Start()
- defer c.Stop()
- }
- func LoadMsgOnTime() {
- fmt.Println("开始执行")
- msgMap := make(map[int]map[string]interface{})
- m := entity.Mysql.SelectBySql("SELECT id,msg_type,title,content,send_time,menu_name,link,group_id,sign FROM message_send_log WHERE send_status = 4 AND isdel = 1 ORDER BY send_time DESC limit 2000")
- if m != nil && len(*m) > 0 {
- for _, val := range *m {
- msgMap[common.IntAll(val["id"])] = val
- }
- GlobMsgMap = msgMap
- }
- fmt.Println("GlobMsgMap len", len(GlobMsgMap))
- }
- func FreeIntelUserPush() {
- data := messageData(config.ConfigJson.FreePushNumber)
- if data == nil || len(*data) == 0 {
- return
- }
- users := IntelUser(true)
- PushData(users, data)
- }
- func PayIntelUserPush() {
- data := messageData(config.ConfigJson.PayPushNumber)
- if data == nil || len(*data) == 0 {
- return
- }
- users := IntelUser(false)
- PushData(users, data)
- }
- func PushData(users []string, data *[]map[string]interface{}) {
- log.Printf("需推送用户:%d\n", len(users))
- var ids []int64
- if data != nil {
- for _, m := range *data {
- _id := common.InterfaceToStr(m["_id"])
- var link []string
- link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
- mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
- link = append(link, mobLink)
- link = append(link, mobLink)
- link = append(link, mobLink)
- iData := map[string]interface{}{
- "msg_type": 9,
- "title": "您有一条专属商机情报",
- "content": fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
- "send_mode": 1,
- "send_time": time.Now().Format("2006-01-02 15:04:05"),
- "send_status": 4,
- "update_time": time.Now().Format("2006-01-02 15:04:05"),
- "createtime": time.Now().Format("2006-01-02 15:04:05"),
- "link": strings.Join(link, ","),
- "isdel": 1,
- "send_userid": "商机情报定时推送",
- "sign": 0,
- "group_id": 5,
- }
- id := entity.Mysql.Insert("message_send_log", iData)
- ids = append(ids, id)
- }
- }
- if len(ids) > 0 {
- var bits []string
- for _, i2 := range ids {
- bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
- }
- err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, strings.Join(bits, ","), 5))
- if err != nil {
- log.Println("message_summary err=== ", err.Error())
- return
- }
- var userIds []string
- for k, user := range users {
- userIds = append(userIds, user)
- if len(userIds) == 1000 || k == len(users)-1 {
- UpdateBatch(userIds, ids)
- userIds = []string{}
- }
- }
- }
- }
- // 用户
- func IntelUser(isFree bool) []string {
- mUser := make(map[string]bool)
- data := entity.Mysql.SelectBySql(`SELECT a.phone as phone FROM entniche_user a INNER JOIN entniche_info b on b.status = 1 and a.power = 1 and a.ent_id = b.id and a.phone != ''`)
- if data != nil { //统计商机管理用户
- for _, m := range *data {
- mUser[common.InterfaceToStr(m["phone"])] = true
- }
- }
- var uData []string
- switch isFree {
- case false:
- user, ok := entity.MQFW.Find("user", map[string]interface{}{
- "i_appid": 2,
- "$or": []map[string]interface{}{
- {
- "i_vip_status": map[string]interface{}{"$gt": 0},
- },
- {
- "i_member_status": map[string]interface{}{"$gt": 0},
- },
- },
- }, "", `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1)
- if ok && user != nil && len(*user) > 0 {
- for _, m := range *user {
- phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
- uData = append(uData, common.InterfaceToStr(m["_id"]))
- if mUser[phone] { //获取剩余商机管理用户
- delete(mUser, phone)
- }
- }
- if len(mUser) > 0 { //统计剩余商机管理用户
- var (
- phones []string
- count int
- )
- for phone := range mUser {
- count++
- phones = append(phones, phone)
- if len(phones) == 100 || count == len(mUser) {
- user1, ok1 := entity.MQFW.Find("user", map[string]interface{}{
- "i_appid": 2,
- "$or": []map[string]interface{}{
- {
- "s_phone": map[string]interface{}{
- "$in": phones,
- },
- },
- {
- "s_m_phone": map[string]interface{}{
- "$in": phones,
- },
- },
- },
- }, "", `{"_id":1}`, false, -1, -1)
- if ok1 && user1 != nil && len(*user1) > 0 {
- for _, m := range *user1 {
- uData = append(uData, common.InterfaceToStr(m["_id"]))
- }
- }
- phones = []string{}
- }
- }
- }
- }
- case true:
- sess := entity.MQFW.GetMgoConn()
- defer entity.MQFW.DestoryMongoConn(sess)
- iter := sess.DB("qfw").C("user").Find(map[string]interface{}{
- "i_appid": 2,
- }).Select(map[string]interface{}{"i_vip_status": 1, "i_member_status": 1, "_id": 1, "s_phone": 1, "s_m_phone": 1}).Iter()
- for m := make(map[string]interface{}); iter.Next(&m); {
- if common.IntAll(m["i_vip_status"]) <= 0 && common.IntAll(m["i_member_status"]) <= 0 {
- phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
- if !mUser[phone] {
- uData = append(uData, common.InterfaceToStr(m["_id"]))
- }
- }
- m = map[string]interface{}{}
- }
- }
- return uData
- }
- // 获取推送消息
- func messageData(number int) *[]map[string]interface{} {
- now := time.Now().AddDate(0, 0, -1)
- starttime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
- enttime := time.Date(now.Year(), now.Month(), now.Day(), 24, 0, 0, 0, time.Local).Unix()
- query := map[string]interface{}{
- "yucetime": map[string]interface{}{"$gt": starttime, "$lt": enttime},
- }
- data, _ := entity.Bidding.Find("project_forecast", query, `{"yucetime": -1}`, `{"title":1,"_id":1}`, false, 0, number)
- return data
- }
- func UpdateBatch(ids []string, msgLogId []int64) {
- str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
- var bits []string
- for _, i2 := range msgLogId {
- bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
- }
- log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
- err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
- if err1 != nil {
- log.Printf("批量更新message_user_summary出错:%s", err1)
- return
- }
- for _, id := range ids {
- keyString := fmt.Sprintf(MsgCountKey, id, 5)
- redis.Del(redisModule, keyString)
- }
- }
|