package push import ( "encoding/json" "fmt" "time" "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/go-logger/logger" . "app.yhyue.com/moapp/message/db" "app.yhyue.com/moapp/message/model" "app.yhyue.com/moapp/message/util" "github.com/nsqio/go-nsq" ) type Push struct { } var funcMap = map[string]func(msg *model.Message){ "jywx_activity_message": jywx_activity_message, } //处理文库积分 func (p *Push) HandleMessage(m *nsq.Message) error { defer common.Catch() if len(m.Body) == 0 { // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. return nil } var msg *model.Message err := json.Unmarshal(m.Body, &msg) if err != nil { logger.Error(err) return err } logger.Info("接收到消息", fmt.Sprintf("%+v", msg)) if msg.E_code == "" { logger.Info("缺少参数e_code", fmt.Sprintf("%+v", msg)) return nil } else if msg.E_time == 0 { logger.Info("缺少参数e_time", fmt.Sprintf("%+v", msg)) return nil } f, f_ok := funcMap[msg.E_code] if f_ok { f(msg) } else { logger.Info("无效的code值", msg) } Mgo_Log.Save("nsq_logs", map[string]interface{}{ "createtime": time.Now().Unix(), "body": msg, "type": "consumer", //producer or consumer }) // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. return nil } func jywx_activity_message(msg *model.Message) { if len(msg.E_body) == 0 { return } util.StartPush(msg.E_body) /* phone := qu.ObjToString(mess["phone"]) product_type := qu.ObjToString(mess["product_type"]) order_code := qu.ObjToString(mess["order_code"]) buy_time := qu.ObjToString(mess["buy_time"]) end_time := qu.ObjToString(mess["end_time"]) */ // util.StartPush("", map[string]interface{}{ // "phone": "18624906090", // "product_type": "超级订阅一个月", // "order_code": "1234567890", // "buy_time": "2021-10-19 22:22:22", // "end_time": "2021-11-19 23:59:59", // }) // return }