package main import ( "encoding/json" "fmt" "io/ioutil" "log" mu "mfw/util" "net" "net/http" qu "qfw/util" "time" . "util" ) //存储udp,防止多个udp同时执行 //var DataChannel = make(chan map[string]string, 50) type UdpNode struct { Data []byte Addr *net.UDPAddr Timestamp int64 Retry int } //初始化udp func InitUdp() { defer qu.Catch() Udpclient = mu.UdpClient{Local: UdpPort, BufSize: 1024} Udpclient.Listen(processUdpMsg) qu.Debug("Udp服务监听", UdpPort) } //udp调用信号 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) qu.Debug("err:", err, "mapInfo:", mapInfo) stype := qu.ObjToString(mapInfo["stype"]) if err != nil || stype == "" { Udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) //回执 } else if mapInfo != nil { //接收到udp,回执 gtid := qu.ObjToString(mapInfo["gtid"]) lteid := qu.ObjToString(mapInfo["lteid"]) Udpclient.WriteUdp([]byte(gtid+"-"+lteid+"-"+stype), mu.OP_NOOP, ra) if gtid == "" || lteid == "" { qu.Debug("id段错误") } else if stype == "update" { //更新bidding UpdateBiddingData(gtid, lteid) } } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { qu.Debug("ok:", ok) UdptaskMap.Delete(ok) } } } //发送udp的时候一个key参数 func SendUdp(gtid, lteid, stype, udpaddr string, udpport int) { defer qu.Catch() key := gtid + "-" + lteid + "-" + stype by, _ := json.Marshal(map[string]interface{}{ "gtid": gtid, "lteid": lteid, "stype": stype, "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(udpaddr), Port: udpport, } node := &UdpNode{by, addr, time.Now().Unix(), 0} UdptaskMap.Store(key, node) qu.Debug("send:", stype, key) Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } func checkMapJob() { //阿里云内网无法发送邮件 log.Println("start checkMapJob") for { UdptaskMap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*UdpNode) if now-node.Timestamp > 120 { node.Retry++ if node.Retry > 10 { log.Println("udp重试失败", k) UdptaskMap.Delete(k) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, Tomail, "downloadfile-send-fail", k.(string))) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Println("邮件发发送:", string(read), err) } } else { log.Println("udp重发", k) Udpclient.WriteUdp(node.Data, mu.OP_TYPE_DATA, node.Addr) } } else if now-node.Timestamp > 10 { log.Println("udp任务超时中..", k) } return true }) time.Sleep(60 * time.Second) } } //func sendMail(key string) { // for i := 1; i <= 3; i++ { // res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "redownload-receive-oss-fail", "发送附件识别失败:"+key)) // if err == nil { // res.Body.Close() // read, err := ioutil.ReadAll(res.Body) // log.Println("邮件发送:", string(read), err) // break // } // } //}