main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  8. "net"
  9. "regexp"
  10. )
  11. var (
  12. Sysconfig map[string]interface{}
  13. MgoMix, Mgo *mongodb.MongodbSim
  14. dbname string
  15. CollQy, CollSave string
  16. lastId int64 // company_change 开始读取的ID
  17. ChangeMap []map[string]interface{}
  18. timeReg, _ = regexp.Compile(`^[\d]{4}-[\d]{1,2}-[\d]{1,2}`)
  19. localPort string // 本地监听端口
  20. UdpClient udp.UdpClient
  21. )
  22. func init() {
  23. util.ReadConfig(&Sysconfig)
  24. //util.ReadConfig("./test.json", &Sysconfig)
  25. localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
  26. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  27. dbname = util.ObjToString(Sysconfig["dbName"])
  28. MgoMix = &mongodb.MongodbSim{
  29. MongodbAddr: util.ObjToString(Sysconfig["dbServer"]),
  30. Size: util.IntAll(Sysconfig["dbSize"]),
  31. DbName: dbname,
  32. UserName: util.ObjToString(Sysconfig["uname"]),
  33. Password: util.ObjToString(Sysconfig["upwd"]),
  34. }
  35. MgoMix.InitPool()
  36. Mgo = &mongodb.MongodbSim{
  37. MongodbAddr: util.ObjToString(Sysconfig["company_server"]), // 172.17.4.181:27001
  38. Size: 10,
  39. DbName: util.ObjToString(Sysconfig["company_db"]),
  40. }
  41. Mgo.InitPool()
  42. CollQy = Sysconfig["coll_qy"].(string) //qyxy_std,全量同步的时候用到
  43. CollSave = Sysconfig["coll_change"].(string) //qyxy_change,
  44. lastId = util.Int64All(Sysconfig["lastId"])
  45. ChangeMap = util.ObjArrToMapArr(Sysconfig["changeType"].([]interface{}))
  46. initChangeMap()
  47. }
  48. func initChangeMap() {
  49. for _, v := range ChangeMap {
  50. list := v["change_keyword"].([]interface{})
  51. var regList []string
  52. if len(list) > 0 {
  53. for _, v1 := range list {
  54. reg := ".*" + util.ObjToString(v1) + ".*"
  55. regList = append(regList, reg)
  56. }
  57. v["change_key_reg"] = regList
  58. } else {
  59. v["change_key_reg"] = []string{".*"}
  60. }
  61. }
  62. }
  63. func main() {
  64. UdpClient.Listen(processUdpMsg)
  65. util.Debug("Udp服务监听======= port:", localPort)
  66. ch := make(chan bool, 1)
  67. <-ch
  68. }
  69. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  70. switch act {
  71. case udp.OP_TYPE_DATA:
  72. var mapInfo map[string]interface{}
  73. err := json.Unmarshal(data, &mapInfo)
  74. util.Debug("processUdpMsg mapinfo :=>", mapInfo)
  75. //拿到同步信号,开始同步数据
  76. if _, ok := mapInfo["start"]; ok {
  77. if start_id, ok := mapInfo["start_id"]; ok {
  78. if util.Int64All(start_id) > 0 {
  79. lastId = util.Int64All(start_id)
  80. }
  81. }
  82. go IncData() //增量数据
  83. }
  84. if err != nil {
  85. util.Debug("Unmarshal err :=>", err)
  86. }
  87. default:
  88. fmt.Println("qyxy_listen_data_new :=====")
  89. }
  90. }