main.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/mongodb"
  5. "app.yhyue.com/data_processing/common_utils/udp"
  6. "encoding/json"
  7. "fmt"
  8. "net"
  9. "regexp"
  10. )
  11. var (
  12. Sysconfig map[string]interface{}
  13. MgoMix, Mgo *mongodb.MongodbSim
  14. dbname string
  15. CollQy, CollSave string
  16. lastId int64 // company_change 开始读取的ID
  17. ChangeMap []map[string]interface{}
  18. timeReg, _ = regexp.Compile(`^[\d]{4}-[\d]{1,2}-[\d]{1,2}`)
  19. localPort string // 本地监听端口
  20. UdpClient udp.UdpClient
  21. )
  22. func init() {
  23. util.ReadConfig(&Sysconfig)
  24. localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
  25. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  26. dbname = util.ObjToString(Sysconfig["dbName"])
  27. MgoMix = &mongodb.MongodbSim{
  28. MongodbAddr: util.ObjToString(Sysconfig["dbServer"]),
  29. Size: util.IntAll(Sysconfig["dbSize"]),
  30. DbName: dbname,
  31. UserName: util.ObjToString(Sysconfig["uname"]),
  32. Password: util.ObjToString(Sysconfig["upwd"]),
  33. }
  34. MgoMix.InitPool()
  35. Mgo = &mongodb.MongodbSim{
  36. MongodbAddr: util.ObjToString(Sysconfig["company_server"]), // 172.17.4.181:27001
  37. Size: 10,
  38. DbName: util.ObjToString(Sysconfig["company_db"]),
  39. }
  40. Mgo.InitPool()
  41. CollQy = Sysconfig["coll_qy"].(string) //qyxy_std,全量同步的时候用到
  42. CollSave = Sysconfig["coll_change"].(string) //qyxy_change,
  43. lastId = util.Int64All(Sysconfig["lastId"])
  44. ChangeMap = util.ObjArrToMapArr(Sysconfig["changeType"].([]interface{}))
  45. initChangeMap()
  46. }
  47. func initChangeMap() {
  48. for _, v := range ChangeMap {
  49. list := v["change_keyword"].([]interface{})
  50. var regList []string
  51. if len(list) > 0 {
  52. for _, v1 := range list {
  53. reg := ".*" + util.ObjToString(v1) + ".*"
  54. regList = append(regList, reg)
  55. }
  56. v["change_key_reg"] = regList
  57. } else {
  58. v["change_key_reg"] = []string{".*"}
  59. }
  60. }
  61. }
  62. func main() {
  63. UdpClient.Listen(processUdpMsg)
  64. util.Debug("Udp服务监听======= port:", localPort)
  65. ch := make(chan bool, 1)
  66. <-ch
  67. }
  68. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  69. switch act {
  70. case udp.OP_TYPE_DATA:
  71. var mapInfo map[string]interface{}
  72. err := json.Unmarshal(data, &mapInfo)
  73. util.Debug("processUdpMsg mapinfo :=>", mapInfo)
  74. //拿到同步信号,开始同步数据
  75. if _, ok := mapInfo["start"]; ok {
  76. go IncData() //增量数据
  77. }
  78. if err != nil {
  79. util.Debug("Unmarshal err :=>", err)
  80. }
  81. default:
  82. fmt.Println("qyxy_listen_data_new :=====")
  83. }
  84. }