nats.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package main
  2. import (
  3. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  4. cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
  5. )
  6. var (
  7. NatsUrl string
  8. Jnats *jnats.Jnats
  9. Subscribe string
  10. NatsThreads chan bool
  11. )
  12. type MsgInfo struct {
  13. Id string //消息唯一id
  14. CurrSetp string //当前步骤
  15. NextSetp string //下个步骤,特殊流程增加
  16. IsEnd int //当前流程后结束 1-结束
  17. Data map[string]interface{} //数据内容
  18. Extend struct { //有需要按照示例增加
  19. Extract struct { //抽取
  20. }
  21. Repeat struct { //判重
  22. SId string //原始id
  23. RId string //被替换id
  24. }
  25. MgoSave struct { //mgo保存更新
  26. SType string //更新u 保存s
  27. col string //表
  28. }
  29. EsSave struct { //es保存更新
  30. SType string //更新u 保存s
  31. Index string //索引
  32. }
  33. }
  34. Err string //错误信息 有错误会告警并终止流程
  35. Stime int64
  36. Etime int64
  37. }
  38. // InitNats 初始化nats
  39. func InitNats() {
  40. NatsUrl = cu.ObjToString(Config["natsurl"])
  41. NatsThreads = make(chan bool, cu.IntAllDef(Config["natsthreads"], 10))
  42. Jnats = jnats.NewJnats(NatsUrl)
  43. Subscribe = cu.ObjToString(Config["subscribe"])
  44. }