main.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package main
  2. import (
  3. "dataflow/util"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "time"
  8. "github.com/nats-io/nats.go"
  9. mgo "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. )
  11. var (
  12. conf util.Conf
  13. url string
  14. DataChan = make(chan map[string]interface{}, 50000)
  15. Step, Subject string
  16. DB *mgo.MongodbSim
  17. )
  18. func init() {
  19. conf = util.GetConf()
  20. url = conf.Config.Natsurl
  21. if url == "" {
  22. url = nats.DefaultURL
  23. }
  24. Step = conf.Config.Process.Step
  25. Subject = conf.Config.Process.Subject + "." + Step
  26. fmt.Println("url:", url, "subject", Subject)
  27. DB = &mgo.MongodbSim{
  28. MongodbAddr: conf.Config.Mongodb.Addr,
  29. DbName: conf.Config.Mongodb.Dbname,
  30. Size: 2,
  31. }
  32. DB.InitPool()
  33. }
  34. func main() {
  35. nc, _ := nats.Connect(url)
  36. defer nc.Drain()
  37. //订阅消息--数据处理加工--保存
  38. nc.QueueSubscribe(Subject, Step, func(msg *nats.Msg) {
  39. data := &util.MsgInfo{}
  40. err := json.Unmarshal(msg.Data, &data)
  41. if err != nil {
  42. log.Println(err)
  43. } else {
  44. DataChan <- data.Data
  45. //time.Sleep(1 * time.Second)
  46. //消息回写
  47. msg.Respond(msg.Data)
  48. }
  49. })
  50. go bulkSave()
  51. ck := make(chan int, 0)
  52. ck <- 0
  53. }
  54. // 批量保存方法
  55. func bulkSave() {
  56. list := []map[string]interface{}{}
  57. for {
  58. select {
  59. case data, ok := <-DataChan:
  60. if ok {
  61. list = append(list, data)
  62. if len(list) >= 20 {
  63. save(list)
  64. list = []map[string]interface{}{}
  65. }
  66. }
  67. case <-time.After(5 * time.Second): // 设置超时时间秒
  68. if len(list) > 0 {
  69. save(list)
  70. list = []map[string]interface{}{}
  71. }
  72. default:
  73. }
  74. }
  75. }
  76. func save(data []map[string]interface{}) {
  77. //log.Println("保存数据")
  78. DB.SaveBulk("msg", data...)
  79. }