flows.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package main
  2. import (
  3. "github.com/nats-io/nats.go"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  6. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "sync"
  8. )
  9. type MsgInfo struct {
  10. Id string //消息唯一id
  11. CurrSetp string //当前步骤
  12. NextSetp string //下个步骤,特殊流程增加
  13. IsEnd int //当前流程后结束 1-结束
  14. Data map[string]interface{} //数据内容
  15. Err string //错误信息 有错误会告警并终止流程
  16. Stime int64
  17. Etime int64
  18. Extend Extend
  19. }
  20. type Extend struct {
  21. Repeat MsgRepeat
  22. }
  23. type MsgRepeat struct {
  24. SId string //原始id
  25. RId string //被替换id
  26. }
  27. var FlowTask = &sync.Map{}
  28. type FlowInfo struct {
  29. msg *nats.Msg
  30. msgInfo *MsgInfo
  31. }
  32. // 增量判重使用...
  33. func initRepeatNats() {
  34. jn = jnats.NewJnats("192.168.3.240:19090")
  35. jn.SubZip("dataprocess.repeat", func(msg *nats.Msg) {
  36. msgInfo := &MsgInfo{}
  37. err := bson.Unmarshal(msg.Data, &msgInfo)
  38. if err != nil {
  39. msgInfo.Err = err.Error()
  40. bs, _ := bson.Marshal(msgInfo)
  41. msg.Respond(bs)
  42. } else {
  43. subtype := qu.ObjToString(msgInfo.Data["subtype"])
  44. if ch, ok := FlowTask.Load(subtype); ok && ch != nil {
  45. ch.(chan FlowInfo) <- FlowInfo{msg, msgInfo}
  46. } else {
  47. c := make(chan FlowInfo, 3000)
  48. FlowTask.Store(subtype, c)
  49. flowsChanRepeat(c)
  50. c <- FlowInfo{msg, msgInfo}
  51. }
  52. }
  53. })
  54. }
  55. func flowsChanRepeat(ch chan FlowInfo) {
  56. go func() {
  57. for {
  58. select {
  59. case info := <-ch:
  60. increaseFlowRepeat(info.msgInfo)
  61. bs, _ := bson.Marshal(info.msgInfo)
  62. info.msg.Respond(bs)
  63. }
  64. }
  65. }()
  66. }