main_test.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package test
  2. import (
  3. "encoding/json"
  4. "log"
  5. "testing"
  6. "time"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. "app.yhyue.com/moapp/message/model"
  9. "github.com/nsqio/go-nsq"
  10. )
  11. func Test_Producer(t *testing.T) {
  12. Mgo_Log := &mongodb.MongodbSim{
  13. MongodbAddr: "192.168.3.206:27090",
  14. Size: 5,
  15. DbName: "qfw",
  16. UserName: "admin",
  17. Password: "123456",
  18. ReplSet: "",
  19. }
  20. Mgo_Log.InitPool()
  21. log.Println("初始化 mongodb")
  22. // Instantiate a producer.
  23. config := nsq.NewConfig()
  24. producer, err := nsq.NewProducer("192.168.3.240:4260", config)
  25. if err != nil {
  26. log.Println(err)
  27. return
  28. }
  29. /*E_code对应的值:
  30. * jyweb_article_open 打开招投标信息三级页
  31. * jydocs_doc_open 打开文库三级页
  32. * jyapp_wx_register app微信注册
  33. * jyapp_phone_register app手机号注册
  34. * jypc_phone_register pc端手机号注册
  35. * jywx_subscribe_new 微信新用户关注
  36. * jywx_subscribe_invite 已邀请并产生了新用户
  37. * jywx_subscribe_invited 被邀请产生新用户
  38. */
  39. msg := &model.Message{
  40. E_code: "jyweb_article_open",
  41. E_userId: "5d6378301c298a5aac7b5402",
  42. E_time: time.Now().Unix(), //1605223065
  43. E_app: "jywx_node",
  44. }
  45. // Synchronously publish a single message to the specified topic.
  46. // Messages can also be sent asynchronously and/or in batches.
  47. b, _ := json.Marshal(msg)
  48. err = producer.Publish("jy_event", b)
  49. if err != nil {
  50. log.Println(err)
  51. return
  52. }
  53. log.Println("消息发布成功")
  54. //保存消息到日志库
  55. Mgo_Log.Save("nsq_logs", map[string]interface{}{
  56. "createtime": time.Now().Unix(),
  57. "body": msg,
  58. "type": "producer", //producer or consumer
  59. })
  60. // Gracefully stop the producer.
  61. producer.Stop()
  62. }
  63. func Test_Producer2(t *testing.T) {
  64. NSQProducer, NsqErr := nsq.NewProducer("192.168.3.240:4260", nsq.NewConfig())
  65. if NsqErr != nil {
  66. log.Printf("初始化 nsq 异常 %v\n", NsqErr)
  67. }
  68. err := func() error {
  69. b, _ := json.Marshal(&model.Message{
  70. E_code: "jywx_exhibition_active",
  71. E_userId: "641aa7bcac1c8e639bc15cdb",
  72. E_time: time.Now().Unix(),
  73. E_app: "jywx_node1",
  74. E_body: map[string]interface{}{
  75. "welfareType": 0,
  76. "welfareContent": "001671fe-8890-11eb-8699-0050568f51e7,0025722a-886e-11eb-8699-0050568f51e7",
  77. },
  78. })
  79. if err := NSQProducer.Publish("jy_event", b); err != nil {
  80. log.Printf("消息发布异常%v\n", err)
  81. return err
  82. }
  83. return nil
  84. }()
  85. NSQProducer.Stop()
  86. if err != nil {
  87. //log.Println("nsq队列写入失败-->", jy.Jyweb_article_open, se.Decode4Hex(from_userid))
  88. }
  89. }