main.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package main
  2. import (
  3. "dataflow/util"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "github.com/nats-io/nats.go"
  8. )
  9. var (
  10. conf util.Conf
  11. url string
  12. ThreadsLimit int
  13. Step, Subject string
  14. )
  15. func init() {
  16. conf = util.GetConf()
  17. url = conf.Config.Natsurl
  18. if url == "" {
  19. url = nats.DefaultURL
  20. }
  21. Step = conf.Config.Process.Step
  22. Subject = conf.Config.Process.Subject + "." + Step
  23. ThreadsLimit = conf.Config.Threads
  24. fmt.Println("url:", url, "subject", Subject)
  25. }
  26. func main() {
  27. nc, _ := nats.Connect(url)
  28. defer nc.Drain()
  29. for i := 0; i < ThreadsLimit; i++ {
  30. go func() {
  31. //订阅消息
  32. nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) {
  33. data := &util.MsgInfo{}
  34. err := json.Unmarshal(msg.Data, &data)
  35. if err != nil {
  36. log.Println(err)
  37. } else {
  38. //数据处理加工--分类
  39. data.Data["toptype"] = "招标"
  40. data.Data["subtype"] = "招标"
  41. fmt.Println(data.Id, "消息加工完成")
  42. //消息回写
  43. bs, _ := json.Marshal(data)
  44. //time.Sleep(1 * time.Second)
  45. msg.Respond(bs)
  46. }
  47. })
  48. }()
  49. }
  50. ck := make(chan int, 0)
  51. ck <- 0
  52. }