123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- package main
- import (
- "dataflow/util"
- "fmt"
- "log"
- "time"
- "github.com/nats-io/nats.go"
- mgo "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- )
- var (
- conf util.Conf
- url string
- ThreadsLimit int
- DB *mgo.MongodbSim
- ProMsgInfo = make(chan *util.MsgInfo, 50000)
- ErrMsgInfo = make(chan *util.MsgInfo, 50000)
- )
- func init() {
- conf = util.GetConf()
- ThreadsLimit = conf.Config.Threads
- url = conf.Config.Natsurl
- if url == "" {
- url = nats.DefaultURL
- }
- log.Println("url", url)
- DB = &mgo.MongodbSim{
- MongodbAddr: conf.Config.Mongodb.Addr,
- DbName: conf.Config.Mongodb.Dbname,
- Size: 2,
- }
- DB.InitPool()
- go ErrBulkSave()
- //go ErrPublish()
- //go SendAlarm()
- }
- func main() {
- nc, _ := nats.Connect(url)
- defer nc.Drain()
- step := conf.Config.Process
- subject := step.Subject
- //消息体
- for i := 0; i < 100; i++ {
- //消息体
- info := &util.MsgInfo{
- Id: fmt.Sprint(i),
- Data: map[string]interface{}{
- "title": "测试消息" + fmt.Sprint(i),
- "detail": "信息源码",
- },
- }
- ProMsgInfo <- info
- }
- for i := 0; i < ThreadsLimit; i++ {
- go func() {
- for {
- info := <-ProMsgInfo
- //按照步骤发布消息
- for _, st := range step.Steps {
- newinfo, err := util.SendRequest(nc, subject, st, info, 1*time.Second)
- if err != nil {
- //TODO 1、消息重发 2、告警
- log.Println("err", info.Id, err)
- ErrMsgInfo <- info
- break
- } else {
- info = newinfo
- //log.Println(info.Id, st, info.Etime-info.Stime)
- }
- }
- }
- }()
- }
- ck := make(chan int, 0)
- ck <- 0
- }
- // 错误消息重发
- func ErrPublish() {
- // TODO 读取msg_err重新发送
- }
- // 错误告警
- func SendAlarm() {
- // TODO
- }
- // 错误数据保存
- func ErrBulkSave() {
- elist := []map[string]interface{}{}
- for {
- select {
- case data, ok := <-ErrMsgInfo:
- if ok {
- tmp := map[string]interface{}{}
- tmp["id"] = data.Id
- tmp["currsetp"] = data.CurrSetp
- tmp["comeintime"] = time.Now().Unix()
- tmp["data"] = data.Data
- elist = append(elist, tmp)
- if len(elist) >= 20 {
- DB.SaveBulk("msg_err", elist...)
- elist = []map[string]interface{}{}
- }
- }
- case <-time.After(5 * time.Second): // 设置超时时间秒
- if len(elist) > 0 {
- DB.SaveBulk("msg_err", elist...)
- elist = []map[string]interface{}{}
- }
- default:
- }
- }
- }
|