nats.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package main
  2. import (
  3. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  4. cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
  5. )
  6. var (
  7. NatsUrl string
  8. Jnats *jnats.Jnats
  9. Subscribe string
  10. )
  11. type MsgInfo struct {
  12. Id string //消息唯一id
  13. CurrSetp string //当前步骤
  14. NextSetp string //下个步骤,特殊流程增加
  15. IsEnd int //当前流程后结束 1-结束
  16. Data map[string]interface{} //数据内容
  17. Extend struct { //有需要按照示例增加
  18. Extract struct { //抽取
  19. }
  20. Repeat struct { //判重
  21. SId string //原始id
  22. RId string //被替换id
  23. }
  24. MgoSave struct { //mgo保存更新
  25. SType string //更新u 保存s
  26. col string //表
  27. }
  28. EsSave struct { //es保存更新
  29. SType string //更新u 保存s
  30. Index string //索引
  31. }
  32. }
  33. Err string //错误信息 有错误会告警并终止流程
  34. Stime int64
  35. Etime int64
  36. }
  37. // InitNats 初始化nats
  38. func InitNats() {
  39. NatsUrl = cu.ObjToString(Config["natsurl"])
  40. Jnats = jnats.NewJnats(NatsUrl)
  41. Subscribe = cu.ObjToString(Config["subscribe"])
  42. }