nats.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package main
  2. import (
  3. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  4. cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
  5. )
  6. var (
  7. NatsUrl string
  8. Jnats *jnats.Jnats
  9. Subscribe string
  10. Step string
  11. NatsThreads chan bool
  12. )
  13. type MsgInfo struct {
  14. Id string //消息唯一id
  15. CurrSetp string //当前步骤
  16. NextSetp string //下个步骤,特殊流程增加
  17. IsEnd int //当前流程后结束 1-结束
  18. Data map[string]interface{} //数据内容
  19. Extend struct { //有需要按照示例增加
  20. Extract struct { //抽取
  21. }
  22. Repeat struct { //判重
  23. SId string //原始id
  24. RId string //被替换id
  25. }
  26. MgoSave struct { //mgo保存更新
  27. SType string //更新u 保存s
  28. col string //表
  29. }
  30. EsSave struct { //es保存更新
  31. SType string //更新u 保存s
  32. Index string //索引
  33. }
  34. }
  35. Err string //错误信息 有错误会告警并终止流程
  36. Stime int64
  37. Etime int64
  38. }
  39. // InitNats 初始化nats
  40. func InitNats() {
  41. NatsUrl = cu.ObjToString(Config["natsurl"])
  42. NatsThreads = make(chan bool, cu.IntAllDef(Config["natsthreads"], 10))
  43. Jnats = jnats.NewJnats(NatsUrl)
  44. Subscribe = cu.ObjToString(Config["subscribe"])
  45. Step = cu.ObjToString(Config["step"])
  46. }