package main import ( "dataflow/util" "fmt" "log" "time" "github.com/nats-io/nats.go" mgo "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) var ( conf util.Conf url string ThreadsLimit int DB *mgo.MongodbSim ProMsgInfo = make(chan *util.MsgInfo, 50000) ErrMsgInfo = make(chan *util.MsgInfo, 50000) ) func init() { conf = util.GetConf() ThreadsLimit = conf.Config.Threads url = conf.Config.Natsurl if url == "" { url = nats.DefaultURL } log.Println("url", url) DB = &mgo.MongodbSim{ MongodbAddr: conf.Config.Mongodb.Addr, DbName: conf.Config.Mongodb.Dbname, Size: 2, } DB.InitPool() go ErrBulkSave() //go ErrPublish() //go SendAlarm() } func main() { nc, _ := nats.Connect(url) defer nc.Drain() step := conf.Config.Process subject := step.Subject //消息体 for i := 0; i < 100; i++ { //消息体 info := &util.MsgInfo{ Id: fmt.Sprint(i), Data: map[string]interface{}{ "title": "测试消息" + fmt.Sprint(i), "detail": "信息源码", }, } ProMsgInfo <- info } for i := 0; i < ThreadsLimit; i++ { go func() { for { info := <-ProMsgInfo //按照步骤发布消息 for _, st := range step.Steps { newinfo, err := util.SendRequest(nc, subject, st, info, 1*time.Second) if err != nil { //TODO 1、消息重发 2、告警 log.Println("err", info.Id, err) ErrMsgInfo <- info break } else { info = newinfo //log.Println(info.Id, st, info.Etime-info.Stime) } } } }() } ck := make(chan int, 0) ck <- 0 } // 错误消息重发 func ErrPublish() { // TODO 读取msg_err重新发送 } // 错误告警 func SendAlarm() { // TODO } // 错误数据保存 func ErrBulkSave() { elist := []map[string]interface{}{} for { select { case data, ok := <-ErrMsgInfo: if ok { tmp := map[string]interface{}{} tmp["id"] = data.Id tmp["currsetp"] = data.CurrSetp tmp["comeintime"] = time.Now().Unix() tmp["data"] = data.Data elist = append(elist, tmp) if len(elist) >= 20 { DB.SaveBulk("msg_err", elist...) elist = []map[string]interface{}{} } } case <-time.After(5 * time.Second): // 设置超时时间秒 if len(elist) > 0 { DB.SaveBulk("msg_err", elist...) elist = []map[string]interface{}{} } default: } } }