package test import ( "encoding/json" "log" "testing" "time" "app.yhyue.com/moapp/jybase/mongodb" "app.yhyue.com/moapp/message/model" "github.com/nsqio/go-nsq" ) func Test_Producer(t *testing.T) { Mgo_Log := &mongodb.MongodbSim{ MongodbAddr: "192.168.3.206:27090", Size: 5, DbName: "qfw", UserName: "admin", Password: "123456", ReplSet: "", } Mgo_Log.InitPool() log.Println("初始化 mongodb") // Instantiate a producer. config := nsq.NewConfig() producer, err := nsq.NewProducer("192.168.3.240:4260", config) if err != nil { log.Println(err) return } /*E_code对应的值: * jyweb_article_open 打开招投标信息三级页 * jydocs_doc_open 打开文库三级页 * jyapp_wx_register app微信注册 * jyapp_phone_register app手机号注册 * jypc_phone_register pc端手机号注册 * jywx_subscribe_new 微信新用户关注 * jywx_subscribe_invite 已邀请并产生了新用户 * jywx_subscribe_invited 被邀请产生新用户 */ msg := &model.Message{ E_code: "jyweb_article_open", E_userId: "5d6378301c298a5aac7b5402", E_time: time.Now().Unix(), //1605223065 E_app: "jywx_node", } // Synchronously publish a single message to the specified topic. // Messages can also be sent asynchronously and/or in batches. b, _ := json.Marshal(msg) err = producer.Publish("jy_event", b) if err != nil { log.Println(err) return } log.Println("消息发布成功") //保存消息到日志库 Mgo_Log.Save("nsq_logs", map[string]interface{}{ "createtime": time.Now().Unix(), "body": msg, "type": "producer", //producer or consumer }) // Gracefully stop the producer. producer.Stop() } func Test_Producer2(t *testing.T) { NSQProducer, NsqErr := nsq.NewProducer("192.168.3.240:4260", nsq.NewConfig()) if NsqErr != nil { log.Printf("初始化 nsq 异常 %v\n", NsqErr) } err := func() error { b, _ := json.Marshal(&model.Message{ E_code: "jywx_exhibition_active", E_userId: "641aa7bcac1c8e639bc15cdb", E_time: time.Now().Unix(), E_app: "jywx_node1", E_body: map[string]interface{}{ "welfareType": 0, "welfareContent": "001671fe-8890-11eb-8699-0050568f51e7,0025722a-886e-11eb-8699-0050568f51e7", }, }) if err := NSQProducer.Publish("jy_event", b); err != nil { log.Printf("消息发布异常%v\n", err) return err } return nil }() NSQProducer.Stop() if err != nil { //log.Println("nsq队列写入失败-->", jy.Jyweb_article_open, se.Decode4Hex(from_userid)) } }