client_test.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package main
  2. import (
  3. "app.yhyue.com/moapp/jybase/mongodb"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "testing"
  7. "time"
  8. )
  9. func Test_NatsClient(t *testing.T) {
  10. mgo := mongodb.MongodbSim{
  11. MongodbAddr: "192.168.3.166:27082",
  12. DbName: "qfw",
  13. Size: 1,
  14. }
  15. mgo.InitPool()
  16. tmp, _ := mgo.FindById("bidding", "65578e020687916fae514d34", nil)
  17. msg := &MsgInfo{
  18. Id: "1", //消息唯一id
  19. CurrSetp: "test", //当前步骤
  20. Data: *tmp, //数据内容
  21. }
  22. msgByte, _ := bson.Marshal(msg)
  23. resp, err := Jnats.PubReqZip(Subscribe, msgByte, 100*time.Second)
  24. if err != nil {
  25. fmt.Println("发布回执异常:", err)
  26. return
  27. }
  28. respMsg := &MsgInfo{}
  29. if bson.Unmarshal(resp.Data, &respMsg) == nil {
  30. fmt.Println(respMsg.CurrSetp)
  31. fmt.Println(respMsg.Data["detail"])
  32. } else {
  33. fmt.Println("解析数据失败")
  34. }
  35. }
  36. // 并发流程测试
  37. func Test_NatsClients(t *testing.T) {
  38. mgo := mongodb.MongodbSim{
  39. MongodbAddr: "192.168.3.166:27082",
  40. DbName: "qfw",
  41. Size: 1,
  42. }
  43. mgo.InitPool()
  44. list, _ := mgo.Find("bidding", nil, nil, nil, false, 0, 10)
  45. for i, l := range *list {
  46. go func(index int, tmp map[string]interface{}) {
  47. msg := &MsgInfo{
  48. Id: fmt.Sprint(index), //消息唯一id
  49. CurrSetp: "test", //当前步骤
  50. Data: tmp, //数据内容
  51. }
  52. fmt.Println("发布:", index)
  53. msgByte, _ := bson.Marshal(msg)
  54. resp, err := Jnats.PubReqZip(Subscribe, msgByte, 10*time.Second)
  55. if err != nil {
  56. fmt.Println("发布回执异常:", err)
  57. return
  58. }
  59. respMsg := &MsgInfo{}
  60. if bson.Unmarshal(resp.Data, &respMsg) == nil {
  61. fmt.Println("回应:", index, respMsg.Id)
  62. } else {
  63. fmt.Println("解析数据失败")
  64. }
  65. }(i, l)
  66. }
  67. select {}
  68. }