12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package main
- import (
- "github.com/nats-io/nats.go"
- "go.mongodb.org/mongo-driver/bson"
- "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "sync"
- )
- type MsgInfo struct {
- Id string //消息唯一id
- CurrSetp string //当前步骤
- NextSetp string //下个步骤,特殊流程增加
- IsEnd int //当前流程后结束 1-结束
- Data map[string]interface{} //数据内容
- Err string //错误信息 有错误会告警并终止流程
- Stime int64
- Etime int64
- Extend Extend
- }
- type Extend struct {
- Repeat MsgRepeat
- }
- type MsgRepeat struct {
- SId string //原始id
- RId string //被替换id
- }
- var FlowTask = &sync.Map{}
- type FlowInfo struct {
- msg *nats.Msg
- msgInfo *MsgInfo
- }
- // 增量判重使用...
- func initRepeatNats() {
- jn = jnats.NewJnats("192.168.3.240:19090")
- jn.SubZip("dataprocess.repeat", func(msg *nats.Msg) {
- msgInfo := &MsgInfo{}
- err := bson.Unmarshal(msg.Data, &msgInfo)
- if err != nil {
- msgInfo.Err = err.Error()
- bs, _ := bson.Marshal(msgInfo)
- msg.Respond(bs)
- } else {
- subtype := qu.ObjToString(msgInfo.Data["subtype"])
- if ch, ok := FlowTask.Load(subtype); ok && ch != nil {
- ch.(chan FlowInfo) <- FlowInfo{msg, msgInfo}
- } else {
- c := make(chan FlowInfo, 3000)
- FlowTask.Store(subtype, c)
- flowsChanRepeat(c)
- c <- FlowInfo{msg, msgInfo}
- }
- }
- })
- }
- func flowsChanRepeat(ch chan FlowInfo) {
- go func() {
- for {
- select {
- case info := <-ch:
- increaseFlowRepeat(info.msgInfo)
- bs, _ := bson.Marshal(info.msgInfo)
- info.msg.Respond(bs)
- }
- }
- }()
- }
|