udp.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/http"
  10. qu "qfw/util"
  11. "time"
  12. . "util"
  13. )
  14. //存储udp,防止多个udp同时执行
  15. //var DataChannel = make(chan map[string]string, 50)
  16. type UdpNode struct {
  17. Data []byte
  18. Addr *net.UDPAddr
  19. Timestamp int64
  20. Retry int
  21. }
  22. //初始化udp
  23. func InitUdp() {
  24. defer qu.Catch()
  25. Udpclient = mu.UdpClient{Local: UdpPort, BufSize: 1024}
  26. Udpclient.Listen(processUdpMsg)
  27. qu.Debug("Udp服务监听", UdpPort)
  28. }
  29. //udp调用信号
  30. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  31. defer qu.Catch()
  32. switch act {
  33. case mu.OP_TYPE_DATA: //上个节点的数据
  34. var mapInfo map[string]interface{}
  35. err := json.Unmarshal(data, &mapInfo)
  36. qu.Debug("err:", err, "mapInfo:", mapInfo)
  37. stype := qu.ObjToString(mapInfo["stype"])
  38. if err != nil || stype == "" {
  39. Udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) //回执
  40. } else if mapInfo != nil { //接收到udp,回执
  41. gtid := qu.ObjToString(mapInfo["gtid"])
  42. lteid := qu.ObjToString(mapInfo["lteid"])
  43. Udpclient.WriteUdp([]byte(gtid+"-"+lteid+"-"+stype), mu.OP_NOOP, ra)
  44. if gtid == "" || lteid == "" {
  45. qu.Debug("id段错误")
  46. } else if stype == "update" { //更新bidding
  47. UpdateBiddingData(gtid, lteid)
  48. }
  49. }
  50. case mu.OP_NOOP: //下个节点回应
  51. ok := string(data)
  52. if ok != "" {
  53. qu.Debug("ok:", ok)
  54. UdptaskMap.Delete(ok)
  55. }
  56. }
  57. }
  58. //发送udp的时候一个key参数
  59. func SendUdp(gtid, lteid, stype, udpaddr string, udpport int) {
  60. defer qu.Catch()
  61. key := gtid + "-" + lteid + "-" + stype
  62. by, _ := json.Marshal(map[string]interface{}{
  63. "gtid": gtid,
  64. "lteid": lteid,
  65. "stype": stype,
  66. "key": key,
  67. })
  68. addr := &net.UDPAddr{
  69. IP: net.ParseIP(udpaddr),
  70. Port: udpport,
  71. }
  72. node := &UdpNode{by, addr, time.Now().Unix(), 0}
  73. UdptaskMap.Store(key, node)
  74. qu.Debug("send:", stype, key)
  75. Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  76. }
  77. func checkMapJob() {
  78. //阿里云内网无法发送邮件
  79. log.Println("start checkMapJob")
  80. for {
  81. UdptaskMap.Range(func(k, v interface{}) bool {
  82. now := time.Now().Unix()
  83. node, _ := v.(*UdpNode)
  84. if now-node.Timestamp > 120 {
  85. node.Retry++
  86. if node.Retry > 10 {
  87. log.Println("udp重试失败", k)
  88. UdptaskMap.Delete(k)
  89. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, Tomail, "downloadfile-send-fail", k.(string)))
  90. if err == nil {
  91. defer res.Body.Close()
  92. read, err := ioutil.ReadAll(res.Body)
  93. log.Println("邮件发发送:", string(read), err)
  94. }
  95. } else {
  96. log.Println("udp重发", k)
  97. Udpclient.WriteUdp(node.Data, mu.OP_TYPE_DATA, node.Addr)
  98. }
  99. } else if now-node.Timestamp > 10 {
  100. log.Println("udp任务超时中..", k)
  101. }
  102. return true
  103. })
  104. time.Sleep(60 * time.Second)
  105. }
  106. }
  107. //func sendMail(key string) {
  108. // for i := 1; i <= 3; i++ {
  109. // res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "redownload-receive-oss-fail", "发送附件识别失败:"+key))
  110. // if err == nil {
  111. // res.Body.Close()
  112. // read, err := ioutil.ReadAll(res.Body)
  113. // log.Println("邮件发送:", string(read), err)
  114. // break
  115. // }
  116. // }
  117. //}