package common import ( "app.yhyue.com/moapp/MessageCenter/entity" "app.yhyue.com/moapp/MessageCenter/rpc/internal/config" "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/encrypt" "app.yhyue.com/moapp/jybase/redis" "context" "fmt" "github.com/robfig/cron/v3" "log" "strings" "time" ) var GlobMsgMap map[int]map[string]interface{} func LoadTask() { // 每隔10分钟执行一次 StatisticalUser(true) c := cron.New(cron.WithSeconds()) c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime) c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送 c.AddFunc(config.ConfigJson.PayIntelTime, PayIntelUserPush) //付费用户推送 go c.Start() defer c.Stop() } func LoadMsgOnTime() { fmt.Println("开始执行") msgMap := make(map[int]map[string]interface{}) m := entity.Mysql.SelectBySql("SELECT id,msg_type,title,content,send_time,menu_name,link,group_id,sign FROM message_send_log WHERE send_status = 4 AND isdel = 1 ORDER BY send_time DESC limit ?", config.ConfigJson.MsgLogLimit) if m != nil && len(*m) > 0 { for _, val := range *m { msgMap[common.IntAll(val["id"])] = val } GlobMsgMap = msgMap } fmt.Println("GlobMsgMap len", len(GlobMsgMap)) } func FreeIntelUserPush() { ids := FreeMessageData(config.ConfigJson.FreePushNumber) if len(ids) == 0 { return } users := StatisticalUser(true) PushData(users, ids, nil) } func PayIntelUserPush() { data := messageData(config.ConfigJson.PayPushNumber) if data == nil || len(*data) == 0 { return } users := StatisticalUser(false) PushData(users, nil, data) } func PushData(users []string, ids []int64, data *[]map[string]interface{}) { log.Printf("需推送用户:%d\n", len(users)) switch len(ids) { case 0: //付费用户 if data != nil { for _, m := range *data { _id := common.InterfaceToStr(m["_id"]) var link []string link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id))) mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id)) link = append(link, mobLink) link = append(link, mobLink) link = append(link, mobLink) iData := map[string]interface{}{ "msg_type": 9, "title": "您有一条专属商机情报", "content": fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])), "send_mode": 1, "send_time": time.Now().Format("2006-01-02 15:04:05"), "send_status": 4, "update_time": time.Now().Format("2006-01-02 15:04:05"), "createtime": time.Now().Format("2006-01-02 15:04:05"), "link": strings.Join(link, ","), "isdel": 1, "send_userid": "商机情报定时推送", "sign": 0, "group_id": 5, } logId := entity.Mysql.Insert("message_send_log", iData) ids = append(ids, logId) } } if len(ids) > 0 { var bits []string for _, i2 := range ids { bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2)) } biyStr := strings.Join(bits, ",") err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, biyStr, 5)) if err != nil { log.Println("message_summary err=== ", err.Error()) return } var userIds []string for k, user := range users { userIds = append(userIds, user) if len(userIds) == 1000 || k == len(users)-1 { UpdateBatch(userIds, biyStr) userIds = []string{} } } } default: //免费用户 var bits []string for _, i2 := range ids { bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2)) } log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where 1=1`, strings.Join(bits, ","))) err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where 1=1`, strings.Join(bits, ","))) if err1 != nil { log.Printf("批量更新message_user_summary出错:%s", err1) return } for _, userid := range users { keyString := fmt.Sprintf(MsgCountKey, userid, 5) redis.Del(redisModule, keyString) } } log.Println("PushData完成") } // 用户 func StatisticalUser(isFree bool) []string { var ids []string mUser := make(map[string]bool) //商机管理用户 data := entity.Mysql.SelectBySql(`SELECT a.phone as phone FROM entniche_user a INNER JOIN entniche_info b on b.status = 1 and a.power = 1 and a.ent_id = b.id and a.phone != ''`) if data != nil && len(*data) > 0 { //统计商机管理用户 for _, m := range *data { if common.InterfaceToStr(m["phone"]) != "" { mUser[common.InterfaceToStr(m["phone"])] = true } } } log.Printf("获取商机管理用户:%d\n", len(mUser)) //企业大会员或超级订阅 entPayUser, ok := entity.MQFW.Find("ent_user", map[string]interface{}{ "$or": []map[string]interface{}{ { //个人订阅 "i_vip_status": map[string]interface{}{ "$gt": 0, }, }, { //个人大会员 "i_member_status": map[string]interface{}{ "$gt": 0, }, }, }, }, nil, `{"i_userid":1}`, false, -1, -1) if ok && entPayUser != nil && len(*entPayUser) > 0 { var userIds []string for k, m := range *entPayUser { userIds = append(userIds, common.InterfaceToStr(m["i_userid"])) if len(userIds) == 100 || k == len(*entPayUser)-1 { idSql := fmt.Sprintf(`SELECT phone FROM entniche_user WHERE id in (%s)`, strings.Join(userIds, `,`)) phoneArr := entity.Mysql.SelectBySql(idSql) if phoneArr != nil && len(*phoneArr) > 0 { for _, m2 := range *phoneArr { if common.InterfaceToStr(m2["phone"]) != "" { mUser[common.InterfaceToStr(m2["phone"])] = true } } } userIds = []string{} } } } log.Printf("获取商机管理+企业付费用户:%d\n", len(mUser)) switch isFree { case false: //付费用户 payMap := make(map[string]bool) payUsers, _ := entity.MQFW.Find("user", map[string]interface{}{ "$or": []map[string]interface{}{ { //个人订阅 "i_vip_status": map[string]interface{}{ "$gt": 0, }, }, { //个人大会员 "i_member_status": map[string]interface{}{ "$gt": 0, }, }, }, }, nil, `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1) if payUsers != nil && len(*payUsers) > 0 { for _, m := range *payUsers { phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string) payMap[common.InterfaceToStr(m["_id"])] = true if mUser[phone] { //避免重复获取 delete(mUser, phone) } } } var ( phones []string count int ) maxCount := len(mUser) for phone := range mUser { //获取剩余商机管理与企业付费用户 count++ phones = append(phones, phone) if len(phones) == 100 || count == maxCount { entPayUsers, _ := entity.MQFW.Find("user", map[string]interface{}{ "$or": []map[string]interface{}{ {"s_phone": map[string]interface{}{ //企业订阅||大会员||商机管理 "$in": phones, }}, {"s_m_phone": map[string]interface{}{ //企业订阅||大会员||商机管理 "$in": phones, }}, }, }, nil, `{"_id":1}`, false, -1, -1) if entPayUsers != nil && len(*entPayUsers) > 0 { for _, m := range *entPayUsers { payMap[common.InterfaceToStr(m["_id"])] = true } } phones = []string{} } } for id := range payMap { ids = append(ids, id) } case true: sess := entity.MQFW.GetMgoConn() defer entity.MQFW.DestoryMongoConn(sess) iter := sess.DB("qfw").C("user").Find(map[string]interface{}{ "i_appid": 2, }).Select(map[string]interface{}{"_id": 1, "i_vip_status": 1, "i_member_status": 1, "s_phone": 1, "s_m_phone": 1}).Iter() for m := make(map[string]interface{}); iter.Next(&m); { if common.IntAll(m["i_vip_status"]) <= 0 && common.IntAll(m["i_member_status"]) <= 0 { phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string) if !mUser[phone] { ids = append(ids, common.InterfaceToStr(m["_id"])) } } m = map[string]interface{}{} } } log.Printf("用户类型:%v,用户数:%d", isFree, len(ids)) return ids } // 获取推送消息 func messageData(number int) *[]map[string]interface{} { now := time.Now().AddDate(0, 0, -1) starttime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix() enttime := time.Date(now.Year(), now.Month(), now.Day(), 24, 0, 0, 0, time.Local).Unix() query := map[string]interface{}{ "yucetime": map[string]interface{}{"$gt": starttime, "$lt": enttime}, } data, _ := entity.Bidding.Find("project_forecast", query, `{"yucetime": -1}`, `{"title":1,"_id":1}`, false, 0, number) return data } func FreeMessageData(number int) []int64 { var ids []int64 now := time.Now() startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).String() m := entity.Mysql.SelectBySql(fmt.Sprintf(`SELECT id FROM message_send_log WHERE createtime > '%s' and send_userid = '商机情报定时推送' ORDER BY createtime DESC LIMIT %d `, startTime, number)) if m != nil && len(*m) > 0 { for _, i2 := range *m { ids = append(ids, common.Int64All(i2["id"])) } } return ids } func UpdateBatch(ids []string, bitStr string) { str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`)) log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, bitStr, str)) err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, bitStr, str)) if err1 != nil { log.Printf("批量更新message_user_summary出错:%s", err1) return } for _, id := range ids { keyString := fmt.Sprintf(MsgCountKey, id, 5) redis.Del(redisModule, keyString) } }