udptask.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package udptask
  2. import (
  3. "encoding/json"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "log"
  6. mu "mfw/util"
  7. "mongodb"
  8. "net"
  9. qu "qfw/util"
  10. "util"
  11. )
  12. var (
  13. Udpclient mu.UdpClient //udp对象
  14. IsTaskOK bool
  15. Timeout = 3
  16. )
  17. func InitUdp() {
  18. updport := util.Sysconfig["udpport"].(string)
  19. Udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  20. log.Println("Udp服务监听", updport)
  21. Udpclient.Listen(processUdpMsg)
  22. }
  23. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  24. switch act {
  25. case mu.OP_TYPE_DATA: //测试接收
  26. var rep map[string]interface{}
  27. err := json.Unmarshal(data, &rep)
  28. log.Println("测试接收:", rep)
  29. if err != nil {
  30. //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
  31. } else {
  32. //by, _ := json.Marshal(map[string]interface{}{
  33. // "taskid": rep["taskid"],
  34. // "stype": rep["stype"],
  35. //})
  36. //go Udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点
  37. }
  38. case mu.OP_NOOP: //下个节点回应
  39. qu.Debug("接收回应:", string(data))
  40. var rep map[string]interface{}
  41. err := json.Unmarshal(data, &rep)
  42. if err != nil { //空数据
  43. //
  44. } else { //正确
  45. }
  46. }
  47. }
  48. //bidding索引udp
  49. func BiddingIndexUdp(id, coll string) {
  50. indexNode := util.Sysconfig["indexNode"].(map[string]interface{})
  51. addr := &net.UDPAddr{
  52. IP: net.ParseIP(indexNode["addr"].(string)),
  53. Port: qu.IntAll(indexNode["port"]),
  54. }
  55. by, _ := json.Marshal(map[string]interface{}{
  56. "query": map[string]interface{}{
  57. "_id": bson.M{
  58. "$gte": mongodb.StringTOBsonId(id),
  59. "$lte": mongodb.StringTOBsonId(id),
  60. }},
  61. "stype": qu.ObjToString(indexNode["stype"]),
  62. "coll": coll,
  63. })
  64. Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  65. }
  66. //项目合并udp
  67. func ProjectSetUdp(id, coll string) {
  68. nextNode := util.Sysconfig["jy_pro_node"].(map[string]interface{})
  69. addr := &net.UDPAddr{
  70. IP: net.ParseIP(nextNode["addr"].(string)),
  71. Port: qu.IntAll(nextNode["port"]),
  72. }
  73. by, _ := json.Marshal(map[string]interface{}{
  74. "infoid": id,
  75. "stype": "updateInfo",
  76. "coll": coll,
  77. })
  78. Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  79. }