main.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "go.uber.org/zap"
  6. utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "net"
  12. "os"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. Mgo *mongodb.MongodbSim
  17. Dbname string
  18. Dbcoll string
  19. Es *elastic.Elastic
  20. Index string
  21. //Itype string
  22. EsFields []string
  23. //Updatetime int64
  24. localPort string // 本地监听端口
  25. UdpClient udp.UdpClient
  26. )
  27. var EsSaveCache = make(chan map[string]interface{}, 5000)
  28. var SP = make(chan bool, 5)
  29. func init() {
  30. utils.ReadConfig(&Sysconfig)
  31. //utils.ReadConfig("test.json", &Sysconfig)
  32. Dbname = Sysconfig["dbname"].(string) //
  33. Dbcoll = Sysconfig["dbcoll"].(string) //qyxy_std
  34. Mgo = &mongodb.MongodbSim{
  35. MongodbAddr: Sysconfig["mgodb"].(string),
  36. Size: utils.IntAllDef(Sysconfig["dbsize"], 5),
  37. DbName: Dbname,
  38. UserName: Sysconfig["uname"].(string),
  39. Password: Sysconfig["upwd"].(string),
  40. }
  41. Mgo.InitPool()
  42. //es
  43. econf := Sysconfig["elastic"].(map[string]interface{})
  44. Index = econf["index"].(string)
  45. //Itype = econf["itype"].(string)
  46. Es = &elastic.Elastic{
  47. S_esurl: econf["addr"].(string),
  48. I_size: utils.IntAllDef(econf["pool"], 12),
  49. Username: econf["username"].(string),
  50. Password: econf["password"].(string),
  51. }
  52. Es.InitElasticSize()
  53. EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{}))
  54. //Updatetime = utils.Int64All(Sysconfig["updatetime"])
  55. localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
  56. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  57. InitLog()
  58. }
  59. func InitLog() {
  60. err := log.InitLog(
  61. log.Path("./logs/log.out"),
  62. log.Level("info"),
  63. log.Compress(true),
  64. log.MaxSize(10),
  65. log.MaxBackups(10),
  66. log.MaxAge(7),
  67. log.Format("json"),
  68. )
  69. if err != nil {
  70. fmt.Printf("InitLog failed: %v\n", err)
  71. os.Exit(1)
  72. }
  73. }
  74. func main() {
  75. UdpClient.Listen(processUdpMsg)
  76. log.Info("main", zap.String("Udp服务监听", localPort))
  77. //go StdAll()
  78. go SaveEs()
  79. ch := make(chan bool, 1)
  80. <-ch
  81. }
  82. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  83. switch act {
  84. case udp.OP_TYPE_DATA:
  85. var mapInfo map[string]interface{}
  86. err := json.Unmarshal(data, &mapInfo)
  87. if err != nil {
  88. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  89. }
  90. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  91. if mapInfo != nil {
  92. //相应UDP回答
  93. key := utils.ObjToString(mapInfo["key"])
  94. if key == "" {
  95. key = "udpok"
  96. }
  97. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  98. }
  99. if tasktype, ok := mapInfo["stype"].(string); ok {
  100. switch tasktype {
  101. case "stdall":
  102. go StdAll()
  103. default:
  104. fmt.Println("tasktype", tasktype)
  105. }
  106. } else {
  107. //拿到同步信号,开始同步数据
  108. if _, ok := mapInfo["start"]; ok {
  109. var start_time, end_time int64
  110. if _, ok2 := mapInfo["start_time"]; ok2 {
  111. start_time = utils.Int64All(mapInfo["start_time"])
  112. end_time = utils.Int64All(mapInfo["end_time"])
  113. }
  114. var q map[string]interface{}
  115. if start_time > 0 {
  116. if end_time > 0 {
  117. q = map[string]interface{}{
  118. "updatetime": map[string]interface{}{
  119. "$gte": start_time,
  120. "$lte": end_time,
  121. },
  122. }
  123. } else {
  124. q = map[string]interface{}{
  125. "updatetime": map[string]interface{}{
  126. "$gte": start_time,
  127. },
  128. }
  129. }
  130. go StdAdd(q) //读取qyxy_std 数据,放入es 数组
  131. } else {
  132. fmt.Println("参数 start_time 为0")
  133. }
  134. }
  135. }
  136. default:
  137. log.Info("processUdpMsg", zap.String("mapinfo", string(data)))
  138. }
  139. }