handler.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package handler
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/go-logger/logger"
  8. . "app.yhyue.com/moapp/message/db"
  9. "app.yhyue.com/moapp/message/model"
  10. "github.com/nsqio/go-nsq"
  11. )
  12. var (
  13. funcMap = map[string]func(msg *model.Message){
  14. "jyapp_wx_register": VarPoints.Jy_user_new,
  15. "jyapp_phone_register": VarPoints.Jy_user_new,
  16. "jypc_phone_register": VarPoints.Jy_user_new,
  17. "jywx_subscribe_new": VarPoints.Jy_user_new,
  18. "jywx_subscribe_invite": VarPoints.Jywx_subscribe_invite,
  19. "jywx_subscribe_invited": VarPoints.Jywx_subscribe_invited,
  20. "jydocs_doc_open": VarPoints.Jydocs_doc_open,
  21. "jyweb_article_open": VarPoints.Jyweb_article_open,
  22. "jywx_activity_message": VarPush.Jywx_activity_message,
  23. }
  24. )
  25. type Handler struct {
  26. }
  27. //
  28. func (h *Handler) HandleMessage(m *nsq.Message) error {
  29. defer common.Catch()
  30. if len(m.Body) == 0 {
  31. // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
  32. return nil
  33. }
  34. var msg *model.Message
  35. err := json.Unmarshal(m.Body, &msg)
  36. if err != nil {
  37. logger.Error(err)
  38. return err
  39. }
  40. logger.Info("接收到消息", fmt.Sprintf("%+v", msg))
  41. if msg.E_code == "" {
  42. logger.Info("缺少参数e_code", fmt.Sprintf("%+v", msg))
  43. return nil
  44. } else if msg.E_time == 0 {
  45. logger.Info("缺少参数e_time", fmt.Sprintf("%+v", msg))
  46. return nil
  47. }
  48. f, f_ok := funcMap[msg.E_code]
  49. if f_ok {
  50. f(msg)
  51. } else {
  52. logger.Info("无效的code值", msg)
  53. return nil
  54. }
  55. Mgo_Log.Save("nsq_logs", map[string]interface{}{
  56. "createtime": time.Now().Unix(),
  57. "body": msg,
  58. "type": "consumer", //producer or consumer
  59. })
  60. // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
  61. return nil
  62. }