1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- package main
- import (
- "dataflow/util"
- "encoding/json"
- "fmt"
- "log"
- "time"
- "github.com/nats-io/nats.go"
- mgo "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- )
- var (
- conf util.Conf
- url string
- DataChan = make(chan map[string]interface{}, 50000)
- Step, Subject string
- DB *mgo.MongodbSim
- )
- func init() {
- conf = util.GetConf()
- url = conf.Config.Natsurl
- if url == "" {
- url = nats.DefaultURL
- }
- Step = conf.Config.Process.Step
- Subject = conf.Config.Process.Subject + "." + Step
- fmt.Println("url:", url, "subject", Subject)
- DB = &mgo.MongodbSim{
- MongodbAddr: conf.Config.Mongodb.Addr,
- DbName: conf.Config.Mongodb.Dbname,
- Size: 2,
- }
- DB.InitPool()
- }
- func main() {
- nc, _ := nats.Connect(url)
- defer nc.Drain()
- //订阅消息--数据处理加工--保存
- nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) {
- data := &util.MsgInfo{}
- err := json.Unmarshal(msg.Data, &data)
- if err != nil {
- log.Println(err)
- } else {
- DataChan <- data.Data
- //time.Sleep(1 * time.Second)
- //消息回写
- msg.Respond(msg.Data)
- }
- })
- go bulkSave()
- ck := make(chan int, 0)
- ck <- 0
- }
- // 批量保存方法
- func bulkSave() {
- list := []map[string]interface{}{}
- for {
- select {
- case data, ok := <-DataChan:
- if ok {
- list = append(list, data)
- if len(list) >= 20 {
- save(list)
- list = []map[string]interface{}{}
- }
- }
- case <-time.After(5 * time.Second): // 设置超时时间秒
- if len(list) > 0 {
- save(list)
- list = []map[string]interface{}{}
- }
- default:
- }
- }
- }
- func save(data []map[string]interface{}) {
- //log.Println("保存数据")
- DB.SaveBulk("msg", data...)
- }
|