浏览代码

nats消息更新

mxs 1 年之前
父节点
当前提交
955c505c77
共有 3 个文件被更改,包括 6 次插入3 次删除
  1. 2 1
      config.json
  2. 2 2
      main.go
  3. 2 0
      nats.go

+ 2 - 1
config.json

@@ -2,7 +2,8 @@
 	"webport": "8010",
 	"natsurl": "192.168.3.240:19090",
 	"natsthreads": 11,
-	"subscribe": "file",
+	"subscribe": "dataprocess",
+	"step": "file",
 	"api": "http://172.17.145.179:19281/_send/_mail",
 	"to": "maxiaoshan@topnet.net.cn,zhangjinkun@topnet.net.cn",
 	"osssite": {

+ 2 - 2
main.go

@@ -37,7 +37,7 @@ func main() {
 // SubscribeNats nats订阅
 func SubscribeNats() {
 	//先消费,带压缩
-	Jnats.SubZip(Subscribe, func(msg *nats.Msg) {
+	Jnats.SubZip(Subscribe+"."+Step, func(msg *nats.Msg) {
 		NatsThreads <- true
 		go func(msg *nats.Msg) {
 			defer func() {
@@ -52,7 +52,7 @@ func SubscribeNats() {
 			} else {
 				//处理数据
 				data.Stime = time.Now().Unix()
-				data.CurrSetp = Subscribe
+				data.CurrSetp = Step
 				DealFile(data.Data)
 				data.Etime = time.Now().Unix()
 			}

+ 2 - 0
nats.go

@@ -9,6 +9,7 @@ var (
 	NatsUrl     string
 	Jnats       *jnats.Jnats
 	Subscribe   string
+	Step        string
 	NatsThreads chan bool
 )
 
@@ -46,4 +47,5 @@ func InitNats() {
 	NatsThreads = make(chan bool, cu.IntAllDef(Config["natsthreads"], 10))
 	Jnats = jnats.NewJnats(NatsUrl)
 	Subscribe = cu.ObjToString(Config["subscribe"])
+	Step = cu.ObjToString(Config["step"])
 }