package main import ( "dataflow/util" "fmt" "log" "time" "app.yhyue.com/moapp/jybase/mongodb" "jygit.jydev.jianyu360.cn/BP/jynats/jnats" ) var ( conf util.Conf ThreadsLimit int Bidding *mongodb.MongodbSim jn *jnats.Jnats ProMsgInfo = make(chan *util.MsgInfo, 50000) ErrMsgInfo = make(chan *util.MsgInfo, 50000) ) func init() { conf = util.GetConf() jn = jnats.NewJnats(conf.Config.Natsurl) Bidding = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize) go PushInfo() go ErrBulkSave() } func main() { // 5秒定时 考虑增加流程暂停 //消息体 // for i := 0; i < 2; i++ { // //消息体 // info := &util.MsgInfo{ // Id: fmt.Sprint(i), // Data: map[string]interface{}{ // "title": "测试消息" + fmt.Sprint(i), // "detail": "信息源码", // "time": time.Now().Unix(), // }, // } // ProMsgInfo <- info // } data, ok := Bidding.FindById("bidding", "6007c2af7cdc9beb5638ede5", nil) if ok && data != nil { // datastr, _ := bson.Marshal(*data) info := &util.MsgInfo{ Id: fmt.Sprint("1"), Data: *data, Stime: time.Now().Unix(), } ProMsgInfo <- info } // session := Bidding.GetMgoConn() // query := map[string]interface{}{} // defer Bidding.DestoryMongoConn(session) // iter := session.DB("").C("").Find(&query).Sort("-_id").Iter() // thisData := map[string]interface{}{} // for { // if !iter.Next(&thisData) { // break // } // thisData = map[string]interface{}{} // } select {} } func PushInfo() { step := conf.Config.Process subject := step.Subject ThreadsLimit = conf.Config.Threads for i := 0; i < ThreadsLimit; i++ { go func() { for { select { case info := <-ProMsgInfo: var err error for _, st := range step.Steps { info, err = sendMsg(jn, subject, st, info) if err != nil { break } } } } }() } } func sendMsg(jn *jnats.Jnats, subject, st string, info *util.MsgInfo) (*util.MsgInfo, error) { newinfo, err := util.SendRequest(jn, subject, st, info, 5*time.Minute) if err != nil { log.Println("err", info.Id, err) // reMsg(info, subject, st) //消息重发 是否需要? util.Send("发布消息失败告警" + info.Id + err.Error()) //告警 频次考虑控制 info.CurrSetp = st //保存错误 ErrMsgInfo <- info return info, err } else { if newinfo.Err != "" { util.Send("消息返回错误告警" + info.Id + newinfo.Err) //告警 info.CurrSetp = st //保存错误 ErrMsgInfo <- info } else { log.Println("消息发送成功") info = newinfo ErrMsgInfo <- info } return info, nil } } func reMsg(msg *util.MsgInfo, subject, sts string) { isOk := false for _, st := range conf.Config.Process.Steps { if !isOk { if st == sts { isOk = true newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute) if err != nil { log.Println("err", msg.Id, err) util.Send("消息重发失败告警" + msg.Id + err.Error()) //告警 break } else { msg = newinfo } } } else { newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute) if err != nil { log.Println("err", msg.Id, err) util.Send("发布消息失败告警" + msg.Id + err.Error()) //告警 ErrMsgInfo <- msg //保存错误 break } else { msg = newinfo } } } } // 错误数据保存 func ErrBulkSave() { for { select { case data := <-ErrMsgInfo: log.Println("接收到错误数据") // dataMap := map[string]interface{}{} // bson.Unmarshal(data.Data, &dataMap) tmp := map[string]interface{}{ "id": data.Id, "currsetp": data.CurrSetp, "nextsetp": data.NextSetp, "err": data.Err, "extend": data.Extend, "comeintime": time.Now().Unix(), "sTime": data.Stime, "eTime": data.Etime, "data": data.Data, } tmpid := Bidding.Save("msg_err", tmp) if tmpid != "" { log.Println("保存成功") } else { log.Println("保存失败~~~") } } } }