package main import ( "dataflow/util" "encoding/json" "fmt" "log" "time" "github.com/nats-io/nats.go" mgo "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) var ( conf util.Conf url string DataChan = make(chan map[string]interface{}, 50000) Step, Subject string DB *mgo.MongodbSim ) func init() { conf = util.GetConf() url = conf.Config.Natsurl if url == "" { url = nats.DefaultURL } Step = conf.Config.Process.Step Subject = conf.Config.Process.Subject + "." + Step fmt.Println("url:", url, "subject", Subject) DB = &mgo.MongodbSim{ MongodbAddr: conf.Config.Mongodb.Addr, DbName: conf.Config.Mongodb.Dbname, Size: 2, } DB.InitPool() } func main() { nc, _ := nats.Connect(url) defer nc.Drain() //订阅消息--数据处理加工--保存 nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) { data := &util.MsgInfo{} err := json.Unmarshal(msg.Data, &data) if err != nil { log.Println(err) } else { DataChan <- data.Data //time.Sleep(1 * time.Second) //消息回写 msg.Respond(msg.Data) } }) go bulkSave() ck := make(chan int, 0) ck <- 0 } // 批量保存方法 func bulkSave() { list := []map[string]interface{}{} for { select { case data, ok := <-DataChan: if ok { list = append(list, data) if len(list) >= 20 { save(list) list = []map[string]interface{}{} } } case <-time.After(5 * time.Second): // 设置超时时间秒 if len(list) > 0 { save(list) list = []map[string]interface{}{} } default: } } } func save(data []map[string]interface{}) { //log.Println("保存数据") DB.SaveBulk("msg", data...) }