123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package test
- import (
- "encoding/json"
- "log"
- "testing"
- "time"
- "app.yhyue.com/moapp/jybase/mongodb"
- "app.yhyue.com/moapp/message/model"
- "github.com/nsqio/go-nsq"
- )
- func Test_Producer(t *testing.T) {
- Mgo_Log := &mongodb.MongodbSim{
- MongodbAddr: "192.168.3.206:27090",
- Size: 5,
- DbName: "qfw",
- UserName: "admin",
- Password: "123456",
- ReplSet: "",
- }
- Mgo_Log.InitPool()
- log.Println("初始化 mongodb")
- // Instantiate a producer.
- config := nsq.NewConfig()
- producer, err := nsq.NewProducer("192.168.3.240:4260", config)
- if err != nil {
- log.Println(err)
- return
- }
- /*E_code对应的值:
- * jyweb_article_open 打开招投标信息三级页
- * jydocs_doc_open 打开文库三级页
- * jyapp_wx_register app微信注册
- * jyapp_phone_register app手机号注册
- * jypc_phone_register pc端手机号注册
- * jywx_subscribe_new 微信新用户关注
- * jywx_subscribe_invite 已邀请并产生了新用户
- * jywx_subscribe_invited 被邀请产生新用户
- */
- msg := &model.Message{
- E_code: "jyweb_article_open",
- E_userId: "5d6378301c298a5aac7b5402",
- E_time: time.Now().Unix(), //1605223065
- E_app: "jywx_node",
- }
- // Synchronously publish a single message to the specified topic.
- // Messages can also be sent asynchronously and/or in batches.
- b, _ := json.Marshal(msg)
- err = producer.Publish("jy_event", b)
- if err != nil {
- log.Println(err)
- return
- }
- log.Println("消息发布成功")
- //保存消息到日志库
- Mgo_Log.Save("nsq_logs", map[string]interface{}{
- "createtime": time.Now().Unix(),
- "body": msg,
- "type": "producer", //producer or consumer
- })
- // Gracefully stop the producer.
- producer.Stop()
- }
- func Test_Producer2(t *testing.T) {
- NSQProducer, NsqErr := nsq.NewProducer("192.168.3.240:4260", nsq.NewConfig())
- if NsqErr != nil {
- log.Printf("初始化 nsq 异常 %v\n", NsqErr)
- }
- err := func() error {
- b, _ := json.Marshal(&model.Message{
- E_code: "jywx_exhibition_active",
- E_userId: "641aa7bcac1c8e639bc15cdb",
- E_time: time.Now().Unix(),
- E_app: "jywx_node1",
- E_body: map[string]interface{}{
- "welfareType": 0,
- "welfareContent": "001671fe-8890-11eb-8699-0050568f51e7,0025722a-886e-11eb-8699-0050568f51e7",
- },
- })
- if err := NSQProducer.Publish("jy_event", b); err != nil {
- log.Printf("消息发布异常%v\n", err)
- return err
- }
- return nil
- }()
- NSQProducer.Stop()
- if err != nil {
- //log.Println("nsq队列写入失败-->", jy.Jyweb_article_open, se.Decode4Hex(from_userid))
- }
- }
|