123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package main
- import (
- "dataflow/util"
- "encoding/json"
- "fmt"
- "log"
- "github.com/nats-io/nats.go"
- )
- var (
- conf util.Conf
- url string
- ThreadsLimit int
- Step, Subject string
- )
- func init() {
- conf = util.GetConf()
- url = conf.Config.Natsurl
- if url == "" {
- url = nats.DefaultURL
- }
- Step = conf.Config.Process.Step
- Subject = conf.Config.Process.Subject + "." + Step
- ThreadsLimit = conf.Config.Threads
- fmt.Println("url:", url, "subject", Subject)
- }
- func main() {
- nc, _ := nats.Connect(url)
- defer nc.Drain()
- for i := 0; i < ThreadsLimit; i++ {
- go func() {
- //订阅消息
- nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) {
- data := &util.MsgInfo{}
- err := json.Unmarshal(msg.Data, &data)
- if err != nil {
- log.Println(err)
- } else {
- //数据处理加工--分类
- data.Data["toptype"] = "招标"
- data.Data["subtype"] = "招标"
- fmt.Println(data.Id, "消息加工完成")
- //消息回写
- bs, _ := json.Marshal(data)
- //time.Sleep(1 * time.Second)
- msg.Respond(bs)
- }
- })
- }()
- }
- ck := make(chan int, 0)
- ck <- 0
- }
|