123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- package main
- import (
- "dataflow/util"
- "fmt"
- "log"
- "strings"
- "time"
- "app.yhyue.com/moapp/jybase/mongodb"
- "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
- )
- var (
- conf util.Conf
- ThreadsLimit int
- Bidding *mongodb.MongodbSim
- jn *jnats.Jnats
- ProMsgInfo = make(chan *util.MsgInfo, 50000)
- ErrMsgInfo = make(chan *util.MsgInfo, 50000)
- )
- func init() {
- conf = util.GetConf()
- jn = jnats.NewJnats(conf.Config.Natsurl)
- Bidding = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize)
- go PushInfo()
- go ErrBulkSave()
- }
- func main() {
- // 5秒定时 考虑增加流程暂停
- //消息体
- // for i := 0; i < 2; i++ {
- // //消息体
- // info := &util.MsgInfo{
- // Id: fmt.Sprint(i),
- // Data: map[string]interface{}{
- // "title": "测试消息" + fmt.Sprint(i),
- // "detail": "信息源码",
- // "time": time.Now().Unix(),
- // },
- // }
- // ProMsgInfo <- info
- // }
- data, ok := Bidding.FindById("bidding", "6007c2af7cdc9beb5638ede5", nil)
- if ok && data != nil {
- // datastr, _ := bson.Marshal(*data)
- info := &util.MsgInfo{
- Id: fmt.Sprint("1"),
- Data: *data,
- Stime: time.Now().Unix(),
- }
- ProMsgInfo <- info
- }
- // session := Bidding.GetMgoConn()
- // query := map[string]interface{}{}
- // defer Bidding.DestoryMongoConn(session)
- // iter := session.DB("").C("").Find(&query).Sort("-_id").Iter()
- // thisData := map[string]interface{}{}
- // for {
- // if !iter.Next(&thisData) {
- // break
- // }
- // thisData = map[string]interface{}{}
- // }
- select {}
- }
- func PushInfo() {
- step := conf.Config.Process
- subject := step.Subject
- ThreadsLimit = conf.Config.Threads
- for i := 0; i < ThreadsLimit; i++ {
- go func() {
- for {
- select {
- case info := <-ProMsgInfo:
- var err error
- for _, st := range step.Steps {
- if strings.Contains(st, "|") {
- st1 := strings.Split(st, "|")[0]
- st2 := strings.Split(st, "|")[1]
- if info.Extend.File.IsFile == 1 {
- info, err = sendMsg(jn, subject, st1, info)
- if err != nil {
- break
- }
- } else {
- info, err = sendMsg(jn, subject, st2, info)
- if err != nil {
- break
- }
- }
- } else {
- info, err = sendMsg(jn, subject, st, info)
- if err != nil {
- break
- }
- }
- }
- }
- }
- }()
- }
- }
- func sendMsg(jn *jnats.Jnats, subject, st string, info *util.MsgInfo) (*util.MsgInfo, error) {
- newinfo, err := util.SendRequest(jn, subject, st, info, 5*time.Minute)
- if err != nil {
- log.Println("err", info.Id, err)
- // reMsg(info, subject, st) //消息重发 是否需要?
- util.Send("发布消息失败告警" + info.Id + err.Error()) //告警 频次考虑控制
- info.CurrSetp = st //保存错误
- ErrMsgInfo <- info
- return info, err
- } else {
- if newinfo.Err != "" {
- util.Send("消息返回错误告警" + info.Id + newinfo.Err) //告警
- info.CurrSetp = st //保存错误
- ErrMsgInfo <- info
- } else {
- log.Println("消息发送成功")
- info = newinfo
- ErrMsgInfo <- info
- }
- return info, nil
- }
- }
- func reMsg(msg *util.MsgInfo, subject, sts string) {
- isOk := false
- for _, st := range conf.Config.Process.Steps {
- if !isOk {
- if st == sts {
- isOk = true
- newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
- if err != nil {
- log.Println("err", msg.Id, err)
- util.Send("消息重发失败告警" + msg.Id + err.Error()) //告警
- break
- } else {
- msg = newinfo
- }
- }
- } else {
- newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
- if err != nil {
- log.Println("err", msg.Id, err)
- util.Send("发布消息失败告警" + msg.Id + err.Error()) //告警
- ErrMsgInfo <- msg //保存错误
- break
- } else {
- msg = newinfo
- }
- }
- }
- }
- // 错误数据保存
- func ErrBulkSave() {
- for {
- select {
- case data := <-ErrMsgInfo:
- log.Println("接收到错误数据")
- // dataMap := map[string]interface{}{}
- // bson.Unmarshal(data.Data, &dataMap)
- tmp := map[string]interface{}{
- "id": data.Id,
- "currsetp": data.CurrSetp,
- "nextsetp": data.NextSetp,
- "err": data.Err,
- "extend": data.Extend,
- "comeintime": time.Now().Unix(),
- "sTime": data.Stime,
- "eTime": data.Etime,
- "data": data.Data,
- }
- tmpid := Bidding.Save("msg_err", tmp)
- if tmpid != "" {
- log.Println("保存成功")
- } else {
- log.Println("保存失败~~~")
- }
- }
- }
- }
|