main.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package main
  2. import (
  3. "dataflow/util"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "github.com/nats-io/nats.go"
  8. )
  9. var (
  10. conf util.Conf
  11. url string
  12. ThreadsLimit int
  13. Subject string
  14. Step string
  15. )
  16. func init() {
  17. conf = util.GetConf()
  18. url = conf.Config.Natsurl
  19. if url == "" {
  20. url = nats.DefaultURL
  21. }
  22. Step = conf.Config.Process.Step
  23. Subject = conf.Config.Process.Subject + "." + Step
  24. ThreadsLimit = conf.Config.Threads
  25. fmt.Println("url:", url, "subject", Subject)
  26. }
  27. func main() {
  28. nc, _ := nats.Connect(url)
  29. defer nc.Drain()
  30. for i := 0; i < ThreadsLimit; i++ {
  31. go func() {
  32. //订阅消息
  33. nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) {
  34. data := &util.MsgInfo{}
  35. err := json.Unmarshal(msg.Data, &data)
  36. if err != nil {
  37. log.Println(err)
  38. } else {
  39. //数据处理加工--抽取
  40. data.Data["area"] = "河南"
  41. data.Data["city"] = "郑州"
  42. fmt.Println(data.Id, "消息加工完成")
  43. //消息回写
  44. //bs, _ := json.Marshal(data)
  45. //time.Sleep(1 * time.Second)
  46. //msg.Respond(bs)
  47. }
  48. })
  49. }()
  50. }
  51. ck := make(chan int, 0)
  52. ck <- 0
  53. }