udp_ids.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  8. "log"
  9. "net"
  10. "time"
  11. )
  12. // udpDatas 通过udp;补充数据 生 索引
  13. func udpDatas() {
  14. UdpClient = udp.UdpClient{Local: ":18888", BufSize: 1024}
  15. UdpClient.Listen(processUdpMsg)
  16. biddingDataAddr = &net.UDPAddr{
  17. Port: util.IntAll(1783),
  18. IP: net.ParseIP("127.0.0.1"),
  19. }
  20. MgoB = &mongodb.MongodbSim{
  21. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  22. //MongodbAddr: "127.0.0.1:27083",
  23. Size: 10,
  24. DbName: "qfw",
  25. UserName: "SJZY_RWbid_ES",
  26. Password: "SJZY@B4i4D5e6S",
  27. //Direct: true,
  28. }
  29. MgoB.InitPool()
  30. sess := MgoB.GetMgoConn()
  31. defer MgoB.DestoryMongoConn(sess)
  32. where := map[string]interface{}{
  33. "comeintime": map[string]interface{}{
  34. "$gt": 1753113600,
  35. "$lt": time.Now().Unix(),
  36. },
  37. }
  38. query := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
  39. count := 0
  40. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  41. if count%10000 == 0 {
  42. log.Println("current:", count)
  43. }
  44. //id := mongodb.BsonIdToSId(tmp["_id"])
  45. //data := map[string]interface{}{
  46. // "stype": "index-by-id",
  47. // "_id": id,
  48. //}
  49. }
  50. log.Println("count:", count)
  51. }
  52. // processUdpMsg 处理udp
  53. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  54. switch act {
  55. case udp.OP_TYPE_DATA:
  56. var mapInfo map[string]interface{}
  57. err := json.Unmarshal(data, &mapInfo)
  58. //print().Println("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  59. util.Debug("processUdpMsg :=>", mapInfo)
  60. if err != nil {
  61. fmt.Println(err)
  62. }
  63. if mapInfo != nil {
  64. key, _ := mapInfo["key"].(string)
  65. if key == "" {
  66. key = "udpok"
  67. }
  68. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  69. }
  70. default:
  71. fmt.Println("qyxy_listen : processUdpMsg =====")
  72. }
  73. }
  74. // SendUdpMsg 通知处理企业新增数据
  75. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  76. bytes, _ := json.Marshal(data)
  77. UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  78. util.Debug("SendUdpMsg:=>", data)
  79. util.Debug("target :=>", target.IP, target.Port)
  80. }