package main import ( "app.yhyue.com/moapp/jybase/mongodb" "fmt" "go.mongodb.org/mongo-driver/bson" "testing" "time" ) func Test_NatsClient(t *testing.T) { mgo := mongodb.MongodbSim{ MongodbAddr: "192.168.3.166:27082", DbName: "qfw", Size: 1, } mgo.InitPool() tmp, _ := mgo.FindById("bidding", "65578e020687916fae514d34", nil) msg := &MsgInfo{ Id: "1", //消息唯一id CurrSetp: "test", //当前步骤 Data: *tmp, //数据内容 } msgByte, _ := bson.Marshal(msg) resp, err := Jnats.PubReqZip(Subscribe, msgByte, 100*time.Second) if err != nil { fmt.Println("发布回执异常:", err) return } respMsg := &MsgInfo{} if bson.Unmarshal(resp.Data, &respMsg) == nil { fmt.Println(respMsg.CurrSetp) fmt.Println(respMsg.Data["detail"]) } else { fmt.Println("解析数据失败") } } // 并发流程测试 func Test_NatsClients(t *testing.T) { mgo := mongodb.MongodbSim{ MongodbAddr: "192.168.3.166:27082", DbName: "qfw", Size: 1, } mgo.InitPool() list, _ := mgo.Find("bidding", nil, nil, nil, false, 0, 10) for i, l := range *list { go func(index int, tmp map[string]interface{}) { msg := &MsgInfo{ Id: fmt.Sprint(index), //消息唯一id CurrSetp: "test", //当前步骤 Data: tmp, //数据内容 } fmt.Println("发布:", index) msgByte, _ := bson.Marshal(msg) resp, err := Jnats.PubReqZip(Subscribe, msgByte, 10*time.Second) if err != nil { fmt.Println("发布回执异常:", err) return } respMsg := &MsgInfo{} if bson.Unmarshal(resp.Data, &respMsg) == nil { fmt.Println("回应:", index, respMsg.Id) } else { fmt.Println("解析数据失败") } }(i, l) } select {} }