package main import ( "github.com/nats-io/nats.go" "go.mongodb.org/mongo-driver/bson" "jygit.jydev.jianyu360.cn/BP/jynats/jnats" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" "sync" ) type MsgInfo struct { Id string //消息唯一id CurrSetp string //当前步骤 NextSetp string //下个步骤,特殊流程增加 IsEnd int //当前流程后结束 1-结束 Data map[string]interface{} //数据内容 Err string //错误信息 有错误会告警并终止流程 Stime int64 Etime int64 Extend Extend } type Extend struct { Repeat MsgRepeat } type MsgRepeat struct { SId string //原始id RId string //被替换id } var FlowTask = &sync.Map{} type FlowInfo struct { msg *nats.Msg msgInfo *MsgInfo } // 增量判重使用... func initRepeatNats() { jn = jnats.NewJnats("192.168.3.240:19090") jn.SubZip("dataprocess.repeat", func(msg *nats.Msg) { msgInfo := &MsgInfo{} err := bson.Unmarshal(msg.Data, &msgInfo) if err != nil { msgInfo.Err = err.Error() bs, _ := bson.Marshal(msgInfo) msg.Respond(bs) } else { subtype := qu.ObjToString(msgInfo.Data["subtype"]) if ch, ok := FlowTask.Load(subtype); ok && ch != nil { ch.(chan FlowInfo) <- FlowInfo{msg, msgInfo} } else { c := make(chan FlowInfo, 3000) FlowTask.Store(subtype, c) flowsChanRepeat(c) c <- FlowInfo{msg, msgInfo} } } }) } func flowsChanRepeat(ch chan FlowInfo) { go func() { for { select { case info := <-ch: increaseFlowRepeat(info.msgInfo) bs, _ := bson.Marshal(info.msgInfo) info.msg.Respond(bs) } } }() }