package points import ( "context" "encoding/json" "fmt" "time" "app.yhyue.com/moapp/jyPoints/rpc/integral" "app.yhyue.com/moapp/jyPoints/rpc/integralclient" "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/go-logger/logger" "app.yhyue.com/moapp/jybase/redis" . "app.yhyue.com/moapp/message/client" . "app.yhyue.com/moapp/message/config" . "app.yhyue.com/moapp/message/db" "app.yhyue.com/moapp/message/model" "github.com/nsqio/go-nsq" ) const ( Redis_Main = "main" ) type Points struct { } var funcMap = map[string]func(msg *model.Message){ "jyapp_wx_register": jy_user_new, "jyapp_phone_register": jy_user_new, "jypc_phone_register": jy_user_new, "jywx_subscribe_new": jy_user_new, "jywx_subscribe_invite": jywx_subscribe_invite, "jywx_subscribe_invited": jywx_subscribe_invited, "jydocs_doc_open": jydocs_doc_open, "jyweb_article_open": jyweb_article_open, } //处理文库积分 func (p *Points) HandleMessage(m *nsq.Message) error { defer common.Catch() if len(m.Body) == 0 { // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. return nil } var msg *model.Message err := json.Unmarshal(m.Body, &msg) if err != nil { logger.Error(err) return err } logger.Info("接收到消息", fmt.Sprintf("%+v", msg)) if msg.E_code == "" { logger.Info("缺少参数e_code", fmt.Sprintf("%+v", msg)) return nil } else if msg.E_time == 0 { logger.Info("缺少参数e_time", fmt.Sprintf("%+v", msg)) return nil } f, f_ok := funcMap[msg.E_code] if f_ok { f(msg) } else { logger.Info("无效的code值", msg) return nil } Mgo_Log.Save("nsq_logs", map[string]interface{}{ "createtime": time.Now().Unix(), "body": msg, "type": "consumer", //producer or consumer }) // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. return nil } //产生新用户 func jy_user_new(msg *model.Message) { integralHarvest(msg, Config.DocPoints.Jywx_subscribe_new, 1002) } //邀请人 func jywx_subscribe_invite(msg *model.Message) { integralHarvest(msg, Config.DocPoints.Jywx_subscribe_invite, 1003) } //被邀请人 func jywx_subscribe_invited(msg *model.Message) { integralHarvest(msg, Config.DocPoints.Jywx_subscribe_invited, 1002) } //打开文库文章页 func jydocs_doc_open(msg *model.Message) { jy_open(msg, Config.DocPoints.Open.Jydocs_doc_open) } //打开招投标信息文章页 func jyweb_article_open(msg *model.Message) { jy_open(msg, Config.DocPoints.Open.Jyweb_article_open) } // func jy_open(msg *model.Message, point int64) { key := fmt.Sprintf("jypoints_share_article_open_%s", msg.E_userId) if redis.Incr(Redis_Main, key) <= Config.DocPoints.Open.Max { now := time.Now() redis.SetExpire(Redis_Main, key, int(time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.Local).Unix()-now.Unix())) integralHarvest(msg, point, 1005) } else { logger.Info(fmt.Sprintf("%+v", msg), "超过一天最大次数", Config.DocPoints.Open.Max, "限制,不再增加积分") } } //增加积分 func integralHarvest(msg *model.Message, point, pointType int64) bool { t := time.Unix(msg.E_time, 0).AddDate(1, 0, 0) req := &integral.Req{ UserId: msg.E_userId, AppId: Config.DocPoints.AppId, PointType: pointType, Point: point, EndDate: date.FormatDate(&t, date.Date_Full_Layout), OperationType: false, } resp, err := integralclient.NewIntegral(ZrpcClient).IntegralHarvest(context.Background(), req) if err != nil { logger.Info(fmt.Sprintf("%+v", msg), "IntegralHarvest Resp error", err) return false } if resp.Code == 1 { logger.Info(fmt.Sprintf("%+v", msg), "已成功增加", point, "积分") return true } else { logger.Info(fmt.Sprintf("%+v", msg), "增加", point, "积分失败", "Code", resp.Code, "Message", resp.Message) return false } }