package main import ( "dataflow/util" "encoding/json" "fmt" "log" "github.com/nats-io/nats.go" ) var ( conf util.Conf url string ThreadsLimit int Subject string Step string ) func init() { conf = util.GetConf() url = conf.Config.Natsurl if url == "" { url = nats.DefaultURL } Step = conf.Config.Process.Step Subject = conf.Config.Process.Subject + "." + Step ThreadsLimit = conf.Config.Threads fmt.Println("url:", url, "subject", Subject) } func main() { nc, _ := nats.Connect(url) defer nc.Drain() for i := 0; i < ThreadsLimit; i++ { go func() { //订阅消息 nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) { data := &util.MsgInfo{} err := json.Unmarshal(msg.Data, &data) if err != nil { log.Println(err) } else { //数据处理加工--抽取 data.Data["area"] = "河南" data.Data["city"] = "郑州" fmt.Println(data.Id, "消息加工完成") //消息回写 //bs, _ := json.Marshal(data) //time.Sleep(1 * time.Second) //msg.Respond(bs) } }) }() } ck := make(chan int, 0) ck <- 0 }