package handler import ( "app.yhyue.com/moapp/message/handler/activity" "encoding/json" "fmt" "time" "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/go-logger/logger" . "app.yhyue.com/moapp/message/db" "app.yhyue.com/moapp/message/model" "github.com/nsqio/go-nsq" ) var ( funcMap = map[string]func(msg *model.Message){ //新用户注册活动 "jyapp_wx_register": activity.NewUserActivity, "jyapp_phone_register": activity.NewUserActivity, "jypc_phone_register": activity.NewUserActivity, "jywx_subscribe_new": activity.NewUserActivity, //邀请注册活动 "jywx_subscribe_invite": activity.InviteActivity, //邀请奖励 "jywx_subscribe_invited": activity.InvitedActivity, //被邀请奖励 "jywx_report_invited": activity.ReportInvitedActivity, //年终报告被邀请奖励 //三级页分享活动 "jydocs_doc_open": activity.ShareOpenDetail, //分享打开文库详情 "jyweb_article_open": activity.ShareOpenDetail, //分享打开标讯详情 //其他 "jywx_activity_message": VarPush.Jywx_activity_message, "jywx_exhibition_active": activity.ExhibitionActive, } ) type Handler struct { } // func (h *Handler) HandleMessage(m *nsq.Message) error { defer common.Catch() if len(m.Body) == 0 { // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. return nil } var msg *model.Message err := json.Unmarshal(m.Body, &msg) if err != nil { logger.Error(err) return err } logger.Info("接收到消息", fmt.Sprintf("%+v", msg)) if msg.E_code == "" { logger.Info("缺少参数e_code", fmt.Sprintf("%+v", msg)) return nil } else if msg.E_time == 0 { logger.Info("缺少参数e_time", fmt.Sprintf("%+v", msg)) return nil } f, f_ok := funcMap[msg.E_code] if f_ok { f(msg) } else { logger.Info("无效的code值", msg) return nil } Mgo_Log.Save("nsq_logs", map[string]interface{}{ "createtime": time.Now().Unix(), "body": msg, "type": "consumer", //producer or consumer }) // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. return nil }