push.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package push
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/go-logger/logger"
  8. . "app.yhyue.com/moapp/message/db"
  9. "app.yhyue.com/moapp/message/model"
  10. "app.yhyue.com/moapp/message/util"
  11. "github.com/nsqio/go-nsq"
  12. )
  13. type Push struct {
  14. }
  15. var funcMap = map[string]func(msg *model.Message){
  16. "jywx_activity_message": jywx_activity_message,
  17. }
  18. //处理文库积分
  19. func (p *Push) HandleMessage(m *nsq.Message) error {
  20. defer common.Catch()
  21. if len(m.Body) == 0 {
  22. // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
  23. return nil
  24. }
  25. var msg *model.Message
  26. err := json.Unmarshal(m.Body, &msg)
  27. if err != nil {
  28. logger.Error(err)
  29. return err
  30. }
  31. logger.Info("接收到消息", fmt.Sprintf("%+v", msg))
  32. if msg.E_code == "" {
  33. logger.Info("缺少参数e_code", fmt.Sprintf("%+v", msg))
  34. return nil
  35. } else if msg.E_time == 0 {
  36. logger.Info("缺少参数e_time", fmt.Sprintf("%+v", msg))
  37. return nil
  38. }
  39. f, f_ok := funcMap[msg.E_code]
  40. if f_ok {
  41. f(msg)
  42. } else {
  43. logger.Info("无效的code值", msg)
  44. }
  45. Mgo_Log.Save("nsq_logs", map[string]interface{}{
  46. "createtime": time.Now().Unix(),
  47. "body": msg,
  48. "type": "consumer", //producer or consumer
  49. })
  50. // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
  51. return nil
  52. }
  53. func jywx_activity_message(msg *model.Message) {
  54. if len(msg.E_body) == 0 {
  55. return
  56. }
  57. util.StartPush(msg.E_body)
  58. /*
  59. phone := qu.ObjToString(mess["phone"])
  60. product_type := qu.ObjToString(mess["product_type"])
  61. order_code := qu.ObjToString(mess["order_code"])
  62. buy_time := qu.ObjToString(mess["buy_time"])
  63. end_time := qu.ObjToString(mess["end_time"])
  64. */
  65. // util.StartPush("", map[string]interface{}{
  66. // "phone": "18624906090",
  67. // "product_type": "超级订阅一个月",
  68. // "order_code": "1234567890",
  69. // "buy_time": "2021-10-19 22:22:22",
  70. // "end_time": "2021-11-19 23:59:59",
  71. // })
  72. // return
  73. }